diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 4001b9f452..fc66f11410 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -584,6 +584,7 @@ minillaplocal.query.files=\ mrr.q,\ multiMapJoin1.q,\ multiMapJoin2.q,\ + murmur_hash_migration.q,\ non_native_window_udf.q,\ optimize_join_ptp.q,\ orc_analyze.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 119aa925c1..af3e35acf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -158,6 +158,9 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, runStatsAnnotation(procCtx); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Setup stats in the operator plan"); + // Update bucketing version of ReduceSinkOp if needed + updateBucketingVersionForReduceSink(procCtx); + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); // run the optimizations that use stats for optimization runStatsDependentOptimizations(procCtx, inputs, outputs); @@ -1521,4 +1524,33 @@ private void markSemiJoinForDPP(OptimizeTezProcContext procCtx) } } } + + private void updateBucketingVersionForReduceSink(OptimizeTezProcContext procCtx) { + // Fetch all the FileSinkOperators. + //List topOps = new ArrayList<>(); + //topOps.addAll(procCtx.parseContext.getTopOps().values()); + Set fsOps = OperatorUtils.findOperators( + procCtx.parseContext.getTopOps().values().iterator().next(), FileSinkOperator.class); + + for (FileSinkOperator fsOp : fsOps) { + Operator parentOfFS = fsOp.getParentOperators().get(0); + if (parentOfFS instanceof GroupByOperator) { + GroupByOperator gbyOp = (GroupByOperator) parentOfFS; + List aggs = gbyOp.getConf().getAggregatorStrings(); + boolean compute_stats = false; + for (String agg : aggs) { + if (agg.equalsIgnoreCase("compute_stats")) { + compute_stats = true; + break; + } + } + if (compute_stats) continue; + } + + // Not compute_stats + Set rsOps = OperatorUtils.findOperatorsUpstream(parentOfFS, ReduceSinkOperator.class); + if (rsOps.isEmpty()) continue; + rsOps.iterator().next().setBucketingVersion(fsOp.getConf().getTableInfo().getBucketingVersion()); + } + } } diff --git a/ql/src/test/queries/clientpositive/ctas.q b/ql/src/test/queries/clientpositive/ctas.q index c4fdda1683..0bd8fc066b 100644 --- a/ql/src/test/queries/clientpositive/ctas.q +++ b/ql/src/test/queries/clientpositive/ctas.q @@ -57,9 +57,9 @@ set hive.exec.mode.local.auto=true; create table nzhang_ctas5 row format delimited fields terminated by ',' lines terminated by '\012' stored as textfile as select key, value from src sort by key, value limit 10; -create table nzhang_ctas6 (key string, `to` string); -insert overwrite table nzhang_ctas6 select key, value from src tablesample (10 rows); -create table nzhang_ctas7 as select key, `to` from nzhang_ctas6; +create table nzhang_ctas6 (key string, `to` string) CLUSTERED BY (key) into 3 BUCKETS ; +explain insert overwrite table nzhang_ctas6 select key, value from src tablesample (10 rows); +explain create table nzhang_ctas7 as select key, `to` from nzhang_ctas6; diff --git a/ql/src/test/queries/clientpositive/murmur_hash_migration.q b/ql/src/test/queries/clientpositive/murmur_hash_migration.q new file mode 100644 index 0000000000..52703d9165 --- /dev/null +++ b/ql/src/test/queries/clientpositive/murmur_hash_migration.q @@ -0,0 +1,35 @@ +--! qt:dataset:src +set hive.stats.column.autogather=false; +set hive.strict.checks.bucketing=false; + +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=30000; + +CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1'); +CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1');; +CREATE TABLE srcbucket_mapjoin_part_n20 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1');; + +load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj1/000001_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08'); + +load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); + +set hive.optimize.bucketingsorting=false; +explain +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20; +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20; + +CREATE TABLE tab_n10(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +explain +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18; +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18; diff --git a/ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out b/ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out new file mode 100644 index 0000000000..82c67df6e6 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out @@ -0,0 +1,256 @@ +PREHOOK: query: CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcbucket_mapjoin_n18 +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcbucket_mapjoin_n18 +PREHOOK: query: CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab_part_n11 +POSTHOOK: query: CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab_part_n11 +PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part_n20 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcbucket_mapjoin_part_n20 +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part_n20 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20 +PREHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_n18 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_n18 +POSTHOOK: Output: default@srcbucket_mapjoin_n18@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj1/000001_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_n18@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj1/000001_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_n18@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20 +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: explain +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcbucket_mapjoin_part_n20 + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_part_n11 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + ds 2008-04-08 + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_part_n11 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_mapjoin_part_n20 +PREHOOK: Input: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: Output: default@tab_part_n11@ds=2008-04-08 +POSTHOOK: query: insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_mapjoin_part_n20 +POSTHOOK: Input: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: Output: default@tab_part_n11@ds=2008-04-08 +POSTHOOK: Lineage: tab_part_n11 PARTITION(ds=2008-04-08).key SIMPLE [(srcbucket_mapjoin_part_n20)srcbucket_mapjoin_part_n20.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tab_part_n11 PARTITION(ds=2008-04-08).value SIMPLE [(srcbucket_mapjoin_part_n20)srcbucket_mapjoin_part_n20.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: CREATE TABLE tab_n10(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab_n10 +POSTHOOK: query: CREATE TABLE tab_n10(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab_n10 +PREHOOK: query: explain +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcbucket_mapjoin_n18 + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_n10 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + ds 2008-04-08 + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_n10 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_mapjoin_n18 +PREHOOK: Input: default@srcbucket_mapjoin_n18@ds=2008-04-08 +PREHOOK: Output: default@tab_n10@ds=2008-04-08 +POSTHOOK: query: insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_mapjoin_n18 +POSTHOOK: Input: default@srcbucket_mapjoin_n18@ds=2008-04-08 +POSTHOOK: Output: default@tab_n10@ds=2008-04-08 +POSTHOOK: Lineage: tab_n10 PARTITION(ds=2008-04-08).key SIMPLE [(srcbucket_mapjoin_n18)srcbucket_mapjoin_n18.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tab_n10 PARTITION(ds=2008-04-08).value SIMPLE [(srcbucket_mapjoin_n18)srcbucket_mapjoin_n18.FieldSchema(name:value, type:string, comment:null), ]