diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3e13785b94..8905ea850f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2210,6 +2210,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "not a multiple of each other, bucketed map-side join cannot be performed, and the\n" + "query will fail if hive.enforce.bucketmapjoin is set to true."), + HIVE_SORT_WHEN_BUCKETING("hive.optimize.clustered.sort", true, + "When this option is true, when a Hive table was created with a clustered by clause, we will also\n" + + "sort by same value (if sort columns were not specified)"), + HIVE_ENFORCE_NOT_NULL_CONSTRAINT("hive.constraint.notnull.enforce", true, "Should \"IS NOT NULL \" constraint be enforced?"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 318e2300ec..395a8c0727 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -27,8 +27,10 @@ import java.util.Set; import java.util.Stack; +import java.util.stream.Collectors; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -239,6 +241,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Sort columns specified by table sortPositions = getSortPositions(destTable.getSortCols(), destTable.getCols()); sortOrder = getSortOrders(destTable.getSortCols(), destTable.getCols()); + } else if (HiveConf.getBoolVar(this.parseCtx.getConf(), HiveConf.ConfVars.HIVE_SORT_WHEN_BUCKETING) && + !bucketPositions.isEmpty()) { + // We use clustered columns as sort columns + sortPositions = new ArrayList<>(bucketPositions); + sortOrder = sortPositions.stream().map(e -> 1).collect(Collectors.toList()); } else { // Infer sort columns from operator tree sortPositions = Lists.newArrayList(); @@ -276,8 +283,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Create ReduceSink operator ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, sortNullOrder, - allRSCols, bucketColumns, numBuckets, - fsParent, fsOp.getConf().getWriteType()); + allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); List descs = new ArrayList(allRSCols.size()); List colNames = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d47457857c..91106b6b4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6845,7 +6845,7 @@ public void setTotalFiles(int totalFiles) { @SuppressWarnings("nls") private Operator genBucketingSortingDest(String dest, Operator input, QB qb, - TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException { + TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException { // If the table is bucketed, and bucketing is enforced, do the following: // If the number of buckets is smaller than the number of maximum reducers, @@ -6854,10 +6854,9 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, // spray the data into multiple buckets. That way, we can support a very large // number of buckets without needing a very large number of reducers. boolean enforceBucketing = false; - ArrayList partnCols = new ArrayList(); - ArrayList sortCols = new ArrayList(); - ArrayList sortOrders = new ArrayList(); - ArrayList nullSortOrders = new ArrayList(); + ArrayList partnCols = new ArrayList<>(); + ArrayList sortCols = new ArrayList<>(); + ArrayList sortOrders = new ArrayList<>(); boolean multiFileSpray = false; int numFiles = 1; int totalFiles = 1; @@ -6869,8 +6868,7 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, } else { partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true); } - } - else { + } else { if(updating(dest) || deleting(dest)) { partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true); enforceBucketing = true; @@ -6884,6 +6882,16 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, if (!enforceBucketing) { throw new SemanticException(ErrorMsg.TBL_SORTED_NOT_BUCKETED.getErrorCodedMsg(dest_tab.getCompleteName())); } + } else if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SORT_WHEN_BUCKETING) && + enforceBucketing && !updating(dest) && !deleting(dest)) { + sortCols = new ArrayList<>(); + for (ExprNodeDesc expr : partnCols) { + sortCols.add(expr.clone()); + } + sortOrders = new ArrayList<>(); + for (int i = 0; i < sortCols.size(); i++) { + sortOrders.add(DirectionUtils.ASCENDING_CODE); + } } if (enforceBucketing) { diff --git a/ql/src/test/results/clientpositive/bucket1.q.out b/ql/src/test/results/clientpositive/bucket1.q.out index 18781f0305..f5158ad921 100644 --- a/ql/src/test/results/clientpositive/bucket1.q.out +++ b/ql/src/test/results/clientpositive/bucket1.q.out @@ -39,8 +39,9 @@ STAGE PLANS: outputColumnNames: _col0, _col1 Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - null sort order: - sort order: + key expressions: UDFToInteger(_col0) (type: int) + null sort order: a + sort order: + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE tag: -1 diff --git a/ql/src/test/results/clientpositive/bucket2.q.out b/ql/src/test/results/clientpositive/bucket2.q.out index a3cc4fbdba..ef6d630c97 100644 --- a/ql/src/test/results/clientpositive/bucket2.q.out +++ b/ql/src/test/results/clientpositive/bucket2.q.out @@ -38,8 +38,9 @@ STAGE PLANS: outputColumnNames: _col0, _col1 Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - null sort order: - sort order: + key expressions: UDFToInteger(_col0) (type: int) + null sort order: a + sort order: + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE tag: -1 diff --git a/ql/src/test/results/clientpositive/bucket3.q.out b/ql/src/test/results/clientpositive/bucket3.q.out index 04638537aa..d418750071 100644 --- a/ql/src/test/results/clientpositive/bucket3.q.out +++ b/ql/src/test/results/clientpositive/bucket3.q.out @@ -39,8 +39,9 @@ STAGE PLANS: outputColumnNames: _col0, _col1 Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - null sort order: - sort order: + key expressions: UDFToInteger(_col0) (type: int) + null sort order: a + sort order: + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE tag: -1 diff --git a/ql/src/test/results/clientpositive/dynpart_sort_opt_bucketing.q.out b/ql/src/test/results/clientpositive/dynpart_sort_opt_bucketing.q.out index 4bbf2a4c35..5be969dcca 100644 --- a/ql/src/test/results/clientpositive/dynpart_sort_opt_bucketing.q.out +++ b/ql/src/test/results/clientpositive/dynpart_sort_opt_bucketing.q.out @@ -288,13 +288,14 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - sort order: + key expressions: _col8 (type: string) + sort order: + Map-reduce partition columns: _col8 (type: string) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: int), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: string), _col12 (type: string) + value expressions: _col0 (type: int), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: string), _col12 (type: string) Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: int), VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: string), VALUE._col4 (type: string), VALUE._col5 (type: string), VALUE._col6 (type: string), VALUE._col7 (type: string), VALUE._col8 (type: string), VALUE._col9 (type: string), VALUE._col10 (type: string), CAST( VALUE._col11 AS decimal(5,2)) (type: decimal(5,2)), VALUE._col12 (type: string) + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: string), VALUE._col4 (type: string), VALUE._col5 (type: string), VALUE._col6 (type: string), VALUE._col7 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col8 (type: string), VALUE._col9 (type: string), CAST( VALUE._col10 AS decimal(5,2)) (type: decimal(5,2)), VALUE._col11 (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator diff --git a/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out b/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out index 53932ebcf5..1d2e17a5b4 100644 --- a/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out +++ b/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out @@ -456,17 +456,17 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 11 Data size: 264 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - key expressions: _col4 (type: tinyint), _bucket_number (type: string) - sort order: ++ + key expressions: _col4 (type: tinyint), _bucket_number (type: string), _col0 (type: smallint) + sort order: +++ Map-reduce partition columns: _col4 (type: tinyint) - value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float) + value expressions: _col1 (type: int), _col2 (type: bigint), _col3 (type: float) Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: vectorized, llap Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) + expressions: KEY._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _bucket_number File Output Operator compressed: false @@ -954,17 +954,17 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 11 Data size: 264 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - key expressions: _col4 (type: tinyint), _bucket_number (type: string) - sort order: ++ + key expressions: _col4 (type: tinyint), _bucket_number (type: string), _col0 (type: smallint) + sort order: +++ Map-reduce partition columns: _col4 (type: tinyint) - value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float) + value expressions: _col1 (type: int), _col2 (type: bigint), _col3 (type: float) Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 Execution mode: vectorized, llap Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) + expressions: KEY._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _bucket_number File Output Operator compressed: false @@ -1333,7 +1333,7 @@ Partition Parameters: numFiles 8 numRows 32 rawDataSize 640 - totalSize 4648 + totalSize 4640 #### A masked pattern was here #### # Storage Information @@ -3214,15 +3214,15 @@ STAGE PLANS: Number of rows: 10 Statistics: Num rows: 10 Data size: 816 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - key expressions: _col2 (type: string), _bucket_number (type: string) - sort order: ++ + key expressions: _col2 (type: string), _bucket_number (type: string), _col1 (type: smallint) + sort order: +++ Map-reduce partition columns: _col2 (type: string) - value expressions: _col0 (type: int), _col1 (type: smallint) + value expressions: _col0 (type: int) Reducer 3 Execution mode: vectorized, llap Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: int), VALUE._col1 (type: smallint), KEY._col2 (type: string), KEY._bucket_number (type: string) + expressions: VALUE._col0 (type: int), KEY._col1 (type: smallint), KEY._col2 (type: string), KEY._bucket_number (type: string) outputColumnNames: _col0, _col1, _col2, _bucket_number File Output Operator compressed: false @@ -3299,13 +3299,13 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@addcolumns_vectorization_true_disallowincompatible_true_fileformat_orc_tinyint POSTHOOK: Input: default@addcolumns_vectorization_true_disallowincompatible_true_fileformat_orc_tinyint@s=cvLH6Eat2yFsyy7p #### A masked pattern was here #### +528534767 -15549 cvLH6Eat2yFsyy7p 528534767 -13326 cvLH6Eat2yFsyy7p 528534767 -9566 cvLH6Eat2yFsyy7p 528534767 7021 cvLH6Eat2yFsyy7p -528534767 -15549 cvLH6Eat2yFsyy7p -528534767 -4213 cvLH6Eat2yFsyy7p 528534767 -15813 cvLH6Eat2yFsyy7p -528534767 15007 cvLH6Eat2yFsyy7p -528534767 4963 cvLH6Eat2yFsyy7p -528534767 -7824 cvLH6Eat2yFsyy7p 528534767 -15431 cvLH6Eat2yFsyy7p +528534767 -7824 cvLH6Eat2yFsyy7p +528534767 -4213 cvLH6Eat2yFsyy7p +528534767 4963 cvLH6Eat2yFsyy7p +528534767 15007 cvLH6Eat2yFsyy7p diff --git a/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out b/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out index 68794d2d7e..87054fd09a 100644 --- a/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out +++ b/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out @@ -400,17 +400,17 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator - key expressions: _col4 (type: tinyint), _bucket_number (type: string) - sort order: ++ + key expressions: _col4 (type: tinyint), _bucket_number (type: string), _col0 (type: smallint) + sort order: +++ Map-reduce partition columns: _col4 (type: tinyint) - value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float) + value expressions: _col1 (type: int), _col2 (type: bigint), _col3 (type: float) Execution mode: llap LLAP IO: no inputs Reducer 2 Execution mode: llap Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) + expressions: KEY._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _bucket_number File Output Operator compressed: false @@ -885,17 +885,17 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator - key expressions: _col4 (type: tinyint), _bucket_number (type: string) - sort order: ++ + key expressions: _col4 (type: tinyint), _bucket_number (type: string), _col0 (type: smallint) + sort order: +++ Map-reduce partition columns: _col4 (type: tinyint) - value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float) + value expressions: _col1 (type: int), _col2 (type: bigint), _col3 (type: float) Execution mode: llap LLAP IO: no inputs Reducer 2 Execution mode: llap Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) + expressions: KEY._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint), KEY._bucket_number (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _bucket_number File Output Operator compressed: false diff --git a/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out b/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out index 4542b5c1b8..3f18e520dd 100644 --- a/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out +++ b/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out @@ -1416,7 +1416,7 @@ STAGE PLANS: TableScan alias: acid_2l_part_sdpo filterExpr: (value = 'bar') (type: boolean) - Statistics: Num rows: 4200 Data size: 1253037 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 4200 Data size: 1247197 Basic stats: COMPLETE Column stats: PARTIAL Filter Operator predicate: (value = 'bar') (type: boolean) Statistics: Num rows: 5 Data size: 1375 Basic stats: COMPLETE Column stats: PARTIAL