diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 078704eb89..f469cd29fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -671,7 +671,6 @@ private static void removeSemijoinOptimizationFromSMBJoins( continue; } - assert parent instanceof SelectOperator; while (parent != null) { if (parent instanceof TableScanOperator) { tsOps.add((TableScanOperator) parent); @@ -685,10 +684,11 @@ private static void removeSemijoinOptimizationFromSMBJoins( // a semijoin filter on any of them, if so, remove it. ParseContext pctx = procCtx.parseContext; + Set rsSet = new HashSet<>(pctx.getRsToSemiJoinBranchInfo().keySet()); for (TableScanOperator ts : tsOps) { - for (ReduceSinkOperator rs : pctx.getRsToSemiJoinBranchInfo().keySet()) { + for (ReduceSinkOperator rs : rsSet) { SemiJoinBranchInfo sjInfo = pctx.getRsToSemiJoinBranchInfo().get(rs); - if (ts == sjInfo.getTsOp()) { + if (sjInfo != null && ts == sjInfo.getTsOp()) { // match! if (LOG.isDebugEnabled()) { LOG.debug("Semijoin optimization found going to SMB join. Removing semijoin " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index bfc1eca61a..01fab9c5d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; @@ -843,7 +844,12 @@ public static ColumnOrigin findColumnOrigin(ExprNodeDesc expr, Operator op) { ExprNodeColumnDesc parentCol = ExprNodeDescUtils.getColumnExpr(parentExpr); if (parentCol != null) { for (Operator currParent : op.getParentOperators()) { - if (currParent.getSchema().getTableNames().contains(parentCol.getTabAlias())) { + RowSchema schema = currParent.getSchema(); + if (schema == null) { + // Happens in case of TezDummyStoreOperator + return null; + } + if (schema.getTableNames().contains(parentCol.getTabAlias())) { parentOp = currParent; break; } diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q index 55f6e8a217..97b3d84339 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q @@ -31,6 +31,7 @@ EXPLAIN SELECT COUNT(*) FROM table_1 t1 + INNER JOIN table_18 t2 ON (((t2.tinyint_col_15) = (t1.bigint_col_7)) AND ((t2.decimal2709_col_9) = (t1.decimal2016_col_26))) AND ((t2.tinyint_col_20) = (t1.tinyint_col_3)) @@ -42,3 +43,50 @@ WHERE (t1.timestamp_col_9) = (tt2.timestamp_col_18)); drop table table_1; drop table table_18; + +-- Hive 15699 +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; + +CREATE TABLE src2 as select * from src1; +insert into src2 select * from src2; +insert into src2 select * from src2; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +set hive.strict.checks.bucketing=false; +set hive.join.emit.interval=2; +set hive.stats.fetch.column.stats=true; +set hive.optimize.bucketingsorting=false; +set hive.stats.autogather=true; + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +set hive.convert.join.bucket.mapjoin.tez = true; +set hive.auto.convert.sortmerge.join = true; + +set hive.auto.convert.join.noconditionaltask.size=0; +set hive.mapjoin.hybridgrace.minwbsize=125; +set hive.mapjoin.hybridgrace.minnumpartitions=4; + +set hive.llap.memory.oversubscription.max.executors.per.query=3; + +CREATE TABLE tab2 (key int, value string, ds string); + +set hive.exec.dynamic.partition.mode=nonstrict +insert into tab2select key, value, ds from tab; +analyze table tab2 compute statistics; +analyze table tab2 compute statistics for columns; + + +explain +select + count(*) + from + (select x.key as key, min(x.value) as value from tab2 x group by x.key) a + join + (select x.key as key, min(x.value) as value from tab2 x group by x.key) b + on + a.key = b.key join src1 c on a.value = c.value where c.key < 0; diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out index cb69251274..650dc9ffd7 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out @@ -40,6 +40,7 @@ PREHOOK: query: EXPLAIN SELECT COUNT(*) FROM table_1 t1 + INNER JOIN table_18 t2 ON (((t2.tinyint_col_15) = (t1.bigint_col_7)) AND ((t2.decimal2709_col_9) = (t1.decimal2016_col_26))) AND ((t2.tinyint_col_20) = (t1.tinyint_col_3)) @@ -53,6 +54,7 @@ POSTHOOK: query: EXPLAIN SELECT COUNT(*) FROM table_1 t1 + INNER JOIN table_18 t2 ON (((t2.tinyint_col_15) = (t1.bigint_col_7)) AND ((t2.decimal2709_col_9) = (t1.decimal2016_col_26))) AND ((t2.tinyint_col_20) = (t1.tinyint_col_3)) @@ -299,3 +301,306 @@ POSTHOOK: query: drop table table_18 POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@table_18 POSTHOOK: Output: default@table_18 +PREHOOK: query: CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcbucket_mapjoin +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcbucket_mapjoin +PREHOOK: query: CREATE TABLE src2 as select * from src1 +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src1 +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +POSTHOOK: query: CREATE TABLE src2 as select * from src1 +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src1 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@src2 +POSTHOOK: Lineage: src2.key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src2.value SIMPLE [(src1)src1.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert into src2 select * from src2 +PREHOOK: type: QUERY +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +POSTHOOK: query: insert into src2 select * from src2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src2 +POSTHOOK: Output: default@src2 +POSTHOOK: Lineage: src2.key SIMPLE [(src2)src2.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: src2.value SIMPLE [(src2)src2.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: insert into src2 select * from src2 +PREHOOK: type: QUERY +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +POSTHOOK: query: insert into src2 select * from src2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src2 +POSTHOOK: Output: default@src2 +POSTHOOK: Lineage: src2.key SIMPLE [(src2)src2.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: src2.value SIMPLE [(src2)src2.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin +POSTHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin +POSTHOOK: Output: default@srcbucket_mapjoin@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@srcbucket_mapjoin@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@srcbucket_mapjoin@ds=2008-04-08 +PREHOOK: query: CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab +POSTHOOK: query: CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab +PREHOOK: query: insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_mapjoin +PREHOOK: Input: default@srcbucket_mapjoin@ds=2008-04-08 +PREHOOK: Output: default@tab@ds=2008-04-08 +POSTHOOK: query: insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_mapjoin +POSTHOOK: Input: default@srcbucket_mapjoin@ds=2008-04-08 +POSTHOOK: Output: default@tab@ds=2008-04-08 +POSTHOOK: Lineage: tab PARTITION(ds=2008-04-08).key SIMPLE [(srcbucket_mapjoin)srcbucket_mapjoin.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tab PARTITION(ds=2008-04-08).value SIMPLE [(srcbucket_mapjoin)srcbucket_mapjoin.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: CREATE TABLE tab2 (key int, value string, ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab2 +POSTHOOK: query: CREATE TABLE tab2 (key int, value string, ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab2 +Warning: Value had a \n character in it. +PREHOOK: query: analyze table tab2 compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +PREHOOK: Output: default@tab2 +POSTHOOK: query: analyze table tab2 compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +POSTHOOK: Output: default@tab2 +PREHOOK: query: analyze table tab2 compute statistics for columns +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +PREHOOK: Output: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: analyze table tab2 compute statistics for columns +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +POSTHOOK: Output: default@tab2 +#### A masked pattern was here #### +PREHOOK: query: explain +select + count(*) + from + (select x.key as key, min(x.value) as value from tab2 x group by x.key) a + join + (select x.key as key, min(x.value) as value from tab2 x group by x.key) b + on + a.key = b.key join src1 c on a.value = c.value where c.key < 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select + count(*) + from + (select x.key as key, min(x.value) as value from tab2 x group by x.key) a + join + (select x.key as key, min(x.value) as value from tab2 x group by x.key) b + on + a.key = b.key join src1 c on a.value = c.value where c.key < 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 8 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 3 <- Map 8 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: x + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(value) + keys: key (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Map 6 + Map Operator Tree: + TableScan + alias: x + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Map 8 + Map Operator Tree: + TableScan + alias: c + filterExpr: ((UDFToDouble(key) < 0.0) and value is not null and (value BETWEEN DynamicValue(RS_21_x__col1_min) AND DynamicValue(RS_21_x__col1_max) and in_bloom_filter(value, DynamicValue(RS_21_x__col1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: ((UDFToDouble(key) < 0.0) and value is not null and (value BETWEEN DynamicValue(RS_21_x__col1_min) AND DynamicValue(RS_21_x__col1_max) and in_bloom_filter(value, DynamicValue(RS_21_x__col1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 8 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: value (type: string) + outputColumnNames: _col1 + Statistics: Num rows: 8 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 8 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0) + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: _col1 is not null (type: boolean) + Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=2) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col1 (type: string) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=2) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +