diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index d08528f319..990a45070c 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -584,6 +584,7 @@ minillaplocal.query.files=\ mrr.q,\ multiMapJoin1.q,\ multiMapJoin2.q,\ + murmur_hash_migration.q,\ non_native_window_udf.q,\ optimize_join_ptp.q,\ orc_analyze.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 1b433c7498..f28467974a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -158,6 +158,9 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, runStatsAnnotation(procCtx); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Setup stats in the operator plan"); + // Update bucketing version of ReduceSinkOp if needed + updateBucketingVersionForUpgrade(procCtx); + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); // run the optimizations that use stats for optimization runStatsDependentOptimizations(procCtx, inputs, outputs); @@ -1663,4 +1666,43 @@ private void markSemiJoinForDPP(OptimizeTezProcContext procCtx) } } } + + private void updateBucketingVersionForUpgrade(OptimizeTezProcContext procCtx) { + // Fetch all the FileSinkOperators. + //List topOps = new ArrayList<>(); + //topOps.addAll(procCtx.parseContext.getTopOps().values()); + Set fsOpsAll = new HashSet<>(); + for (TableScanOperator ts : procCtx.parseContext.getTopOps().values()) { + Set fsOps = OperatorUtils.findOperators( + ts, FileSinkOperator.class); + fsOpsAll.addAll(fsOps); + } + + + for (FileSinkOperator fsOp : fsOpsAll) { + Operator parentOfFS = fsOp.getParentOperators().get(0); + if (parentOfFS instanceof GroupByOperator) { + GroupByOperator gbyOp = (GroupByOperator) parentOfFS; + List aggs = gbyOp.getConf().getAggregatorStrings(); + boolean compute_stats = false; + for (String agg : aggs) { + if (agg.equalsIgnoreCase("compute_stats")) { + compute_stats = true; + break; + } + } + if (compute_stats) + continue; + } + + // Not compute_stats + Set rsOps = OperatorUtils.findOperatorsUpstream(parentOfFS, ReduceSinkOperator.class); + if (rsOps.isEmpty()) + continue; + // Skip setting if the bucketing version is not set in FileSinkOp. + if (fsOp.getConf().getTableInfo().isSetBucketingVersion()) { + rsOps.iterator().next().setBucketingVersion(fsOp.getConf().getTableInfo().getBucketingVersion()); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index bbce940c2e..b73faa577c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -183,6 +183,9 @@ public boolean isNonNative() { return (properties.getProperty(hive_metastoreConstants.META_TABLE_STORAGE) != null); } + public boolean isSetBucketingVersion() { + return properties.getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION) != null; + } public int getBucketingVersion() { return Utilities.getBucketingVersion( properties.getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION)); diff --git a/ql/src/test/queries/clientpositive/murmur_hash_migration.q b/ql/src/test/queries/clientpositive/murmur_hash_migration.q new file mode 100644 index 0000000000..57ac10b94e --- /dev/null +++ b/ql/src/test/queries/clientpositive/murmur_hash_migration.q @@ -0,0 +1,36 @@ +--! qt:dataset:src +set hive.stats.column.autogather=false; +set hive.strict.checks.bucketing=false; + +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=30000; + +CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1'); +CREATE TABLE srcbucket_mapjoin_part_n20 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1'); + +load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj1/000001_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08'); + +load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); +load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08'); + +set hive.optimize.bucketingsorting=false; +CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +explain +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20; +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20; + +CREATE TABLE tab_n10(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +explain +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18; +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18; + diff --git a/ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out b/ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out new file mode 100644 index 0000000000..c6255d1995 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out @@ -0,0 +1,256 @@ +PREHOOK: query: CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcbucket_mapjoin_n18 +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcbucket_mapjoin_n18 +PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part_n20 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcbucket_mapjoin_part_n20 +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part_n20 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE TBLPROPERTIES("bucketing_version" = '1') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20 +PREHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_n18 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_n18 +POSTHOOK: Output: default@srcbucket_mapjoin_n18@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj1/000001_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_n18@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj1/000001_0' INTO TABLE srcbucket_mapjoin_n18 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_n18@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000000_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20 +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000001_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000002_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/bmj/000003_0' INTO TABLE srcbucket_mapjoin_part_n20 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: query: CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab_part_n11 +POSTHOOK: query: CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab_part_n11 +PREHOOK: query: explain +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcbucket_mapjoin_part_n20 + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 149 Data size: 85004 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_part_n11 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + ds 2008-04-08 + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_part_n11 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_mapjoin_part_n20 +PREHOOK: Input: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +PREHOOK: Output: default@tab_part_n11@ds=2008-04-08 +POSTHOOK: query: insert overwrite table tab_part_n11 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_part_n20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_mapjoin_part_n20 +POSTHOOK: Input: default@srcbucket_mapjoin_part_n20@ds=2008-04-08 +POSTHOOK: Output: default@tab_part_n11@ds=2008-04-08 +POSTHOOK: Lineage: tab_part_n11 PARTITION(ds=2008-04-08).key SIMPLE [(srcbucket_mapjoin_part_n20)srcbucket_mapjoin_part_n20.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tab_part_n11 PARTITION(ds=2008-04-08).value SIMPLE [(srcbucket_mapjoin_part_n20)srcbucket_mapjoin_part_n20.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: CREATE TABLE tab_n10(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab_n10 +POSTHOOK: query: CREATE TABLE tab_n10(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab_n10 +PREHOOK: query: explain +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcbucket_mapjoin_n18 + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 72 Data size: 41240 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_n10 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + ds 2008-04-08 + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tab_n10 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_mapjoin_n18 +PREHOOK: Input: default@srcbucket_mapjoin_n18@ds=2008-04-08 +PREHOOK: Output: default@tab_n10@ds=2008-04-08 +POSTHOOK: query: insert overwrite table tab_n10 partition (ds='2008-04-08') + select key,value from srcbucket_mapjoin_n18 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_mapjoin_n18 +POSTHOOK: Input: default@srcbucket_mapjoin_n18@ds=2008-04-08 +POSTHOOK: Output: default@tab_n10@ds=2008-04-08 +POSTHOOK: Lineage: tab_n10 PARTITION(ds=2008-04-08).key SIMPLE [(srcbucket_mapjoin_n18)srcbucket_mapjoin_n18.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tab_n10 PARTITION(ds=2008-04-08).value SIMPLE [(srcbucket_mapjoin_n18)srcbucket_mapjoin_n18.FieldSchema(name:value, type:string, comment:null), ]