diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index b6db6aa..c513ee5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -402,33 +402,44 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex exprNodeDesc = exprNodeDesc.getChildren().get(0); } - if (exprNodeDesc instanceof ExprNodeColumnDesc) { - internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn(); - if (parentOfRS instanceof SelectOperator) { - // Make sure the semijoin branch is not on parition column. - ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc) (parentOfRS. - getColumnExprMap().get(internalColName))); - String colName = ExprNodeDescUtils.extractColName(colExpr); - - // Fetch the TableScan Operator. - Operator op = parentOfRS.getParentOperators().get(0); - while (op != null && !(op instanceof TableScanOperator)) { - op = op.getParentOperators().get(0); - } - assert op != null; - - Table table = ((TableScanOperator) op).getConf().getTableMetadata(); - if (table.isPartitionKey(colName)) { - // The column is partition column, skip the optimization. - return false; - } - } - } else { + if (!(exprNodeDesc instanceof ExprNodeColumnDesc)) { // No column found! // Bail out return false; } + internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn(); + if (parentOfRS instanceof SelectOperator) { + // Make sure the semijoin branch is not on partition column. + ExprNodeDesc expr = parentOfRS.getColumnExprMap().get(internalColName); + while (!(expr instanceof ExprNodeColumnDesc) && + (expr.getChildren() != null)) { + expr = expr.getChildren().get(0); + } + + if (!(expr instanceof ExprNodeColumnDesc)) { + // No column found! + // Bail out + return false; + } + + ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc) expr; + String colName = ExprNodeDescUtils.extractColName(colExpr); + + // Fetch the TableScan Operator. + Operator op = parentOfRS.getParentOperators().get(0); + while (op != null && !(op instanceof TableScanOperator)) { + op = op.getParentOperators().get(0); + } + assert op != null; + + Table table = ((TableScanOperator) op).getConf().getTableMetadata(); + if (table.isPartitionKey(colName)) { + // The column is partition column, skip the optimization. + return false; + } + } + List keyExprs = new ArrayList(); keyExprs.add(key); diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q index adfc7a6..5734f74 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q @@ -35,3 +35,78 @@ explain merge into acidTbl as t using ( WHEN MATCHED AND s.a > 8 THEN DELETE WHEN MATCHED THEN UPDATE SET b = 7 WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b); + +--HIVE-16211 +drop database if exists type2_scd_helper cascade; +create database type2_scd_helper; +use type2_scd_helper; + +drop table if exists customer; +drop table if exists customer_updates; +drop table if exists new_customer_stage; + +create table customer ( +source_pk int, +sk string, +name string, +state string, +is_current boolean, +end_date date +) +CLUSTERED BY (sk) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ("transactional"="true"); + +insert into customer values + ( 1, "ABC", "Abc Co.", "OH", true, null ), + ( 2, "DEF", "Def Co.", "PA", true, null ), + ( 3, "XYZ", "Xyz Co.", "CA", true, null ); +select * from customer order by source_pk; + +create table new_customer_stage ( +source_pk int, +name string, +state string +); +insert into new_customer_stage values + ( 1, "Abc Co.", "OH" ), + ( 2, "Def Co.", "PA" ), + ( 3, "Xyz Co.", "TX" ), + ( 4, "Pdq Co.", "WI" ); + +drop table if exists scd_types; +create table scd_types ( +type int, +invalid_key int +); +insert into scd_types values (1, null), (2, -1), (2, null); + +merge into customer +using ( + select + *, + coalesce(invalid_key, source_pk) as join_key + from ( + select + stage.source_pk, stage.name, stage.state, + case when customer.source_pk is null then 1 + when stage.name <> customer.name or stage.state <> customer.state then 2 + else 0 end as scd_row_type + from + new_customer_stage stage + left join + customer + on (stage.source_pk = customer.source_pk and customer.is_current = true) + ) updates + join scd_types on scd_types.type = scd_row_type +) sub +on sub.join_key = customer.source_pk +when matched then update set + is_current = false, + end_date = current_date() +when not matched then insert values + (sub.source_pk, upper(substr(sub.name, 0, 3)), sub.name, sub.state, true, null); +select * from customer order by source_pk; + +drop table customer; +drop table customer_updates; +drop table new_customer_stage; +drop table scd_types; diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out index 829f28f..b9952ec 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out @@ -770,3 +770,255 @@ STAGE PLANS: Stage: Stage-9 Stats-Aggr Operator +PREHOOK: query: drop database if exists type2_scd_helper cascade +PREHOOK: type: DROPDATABASE +POSTHOOK: query: drop database if exists type2_scd_helper cascade +POSTHOOK: type: DROPDATABASE +PREHOOK: query: create database type2_scd_helper +PREHOOK: type: CREATEDATABASE +PREHOOK: Output: database:type2_scd_helper +POSTHOOK: query: create database type2_scd_helper +POSTHOOK: type: CREATEDATABASE +POSTHOOK: Output: database:type2_scd_helper +PREHOOK: query: use type2_scd_helper +PREHOOK: type: SWITCHDATABASE +PREHOOK: Input: database:type2_scd_helper +POSTHOOK: query: use type2_scd_helper +POSTHOOK: type: SWITCHDATABASE +POSTHOOK: Input: database:type2_scd_helper +PREHOOK: query: drop table if exists customer +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists customer +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists customer_updates +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists customer_updates +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists new_customer_stage +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists new_customer_stage +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table customer ( +source_pk int, +sk string, +name string, +state string, +is_current boolean, +end_date date +) +CLUSTERED BY (sk) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:type2_scd_helper +PREHOOK: Output: type2_scd_helper@customer +POSTHOOK: query: create table customer ( +source_pk int, +sk string, +name string, +state string, +is_current boolean, +end_date date +) +CLUSTERED BY (sk) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:type2_scd_helper +POSTHOOK: Output: type2_scd_helper@customer +PREHOOK: query: insert into customer values + ( 1, "ABC", "Abc Co.", "OH", true, null ), + ( 2, "DEF", "Def Co.", "PA", true, null ), + ( 3, "XYZ", "Xyz Co.", "CA", true, null ) +PREHOOK: type: QUERY +PREHOOK: Output: type2_scd_helper@customer +POSTHOOK: query: insert into customer values + ( 1, "ABC", "Abc Co.", "OH", true, null ), + ( 2, "DEF", "Def Co.", "PA", true, null ), + ( 3, "XYZ", "Xyz Co.", "CA", true, null ) +POSTHOOK: type: QUERY +POSTHOOK: Output: type2_scd_helper@customer +POSTHOOK: Lineage: customer.end_date EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col6, type:string, comment:), ] +POSTHOOK: Lineage: customer.is_current EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col5, type:string, comment:), ] +POSTHOOK: Lineage: customer.name SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: customer.sk SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: customer.source_pk EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: customer.state SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +PREHOOK: query: select * from customer order by source_pk +PREHOOK: type: QUERY +PREHOOK: Input: type2_scd_helper@customer +#### A masked pattern was here #### +POSTHOOK: query: select * from customer order by source_pk +POSTHOOK: type: QUERY +POSTHOOK: Input: type2_scd_helper@customer +#### A masked pattern was here #### +1 ABC Abc Co. OH true NULL +2 DEF Def Co. PA true NULL +3 XYZ Xyz Co. CA true NULL +PREHOOK: query: create table new_customer_stage ( +source_pk int, +name string, +state string +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:type2_scd_helper +PREHOOK: Output: type2_scd_helper@new_customer_stage +POSTHOOK: query: create table new_customer_stage ( +source_pk int, +name string, +state string +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:type2_scd_helper +POSTHOOK: Output: type2_scd_helper@new_customer_stage +PREHOOK: query: insert into new_customer_stage values + ( 1, "Abc Co.", "OH" ), + ( 2, "Def Co.", "PA" ), + ( 3, "Xyz Co.", "TX" ), + ( 4, "Pdq Co.", "WI" ) +PREHOOK: type: QUERY +PREHOOK: Output: type2_scd_helper@new_customer_stage +POSTHOOK: query: insert into new_customer_stage values + ( 1, "Abc Co.", "OH" ), + ( 2, "Def Co.", "PA" ), + ( 3, "Xyz Co.", "TX" ), + ( 4, "Pdq Co.", "WI" ) +POSTHOOK: type: QUERY +POSTHOOK: Output: type2_scd_helper@new_customer_stage +POSTHOOK: Lineage: new_customer_stage.name SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: new_customer_stage.source_pk EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: new_customer_stage.state SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +PREHOOK: query: drop table if exists scd_types +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists scd_types +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table scd_types ( +type int, +invalid_key int +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:type2_scd_helper +PREHOOK: Output: type2_scd_helper@scd_types +POSTHOOK: query: create table scd_types ( +type int, +invalid_key int +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:type2_scd_helper +POSTHOOK: Output: type2_scd_helper@scd_types +PREHOOK: query: insert into scd_types values (1, null), (2, -1), (2, null) +PREHOOK: type: QUERY +PREHOOK: Output: type2_scd_helper@scd_types +POSTHOOK: query: insert into scd_types values (1, null), (2, -1), (2, null) +POSTHOOK: type: QUERY +POSTHOOK: Output: type2_scd_helper@scd_types +POSTHOOK: Lineage: scd_types.invalid_key EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: scd_types.type EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: merge into customer +using ( + select + *, + coalesce(invalid_key, source_pk) as join_key + from ( + select + stage.source_pk, stage.name, stage.state, + case when customer.source_pk is null then 1 + when stage.name <> customer.name or stage.state <> customer.state then 2 + else 0 end as scd_row_type + from + new_customer_stage stage + left join + customer + on (stage.source_pk = customer.source_pk and customer.is_current = true) + ) updates + join scd_types on scd_types.type = scd_row_type +) sub +on sub.join_key = customer.source_pk +when matched then update set + is_current = false, + end_date = current_date() +when not matched then insert values + (sub.source_pk, upper(substr(sub.name, 0, 3)), sub.name, sub.state, true, null) +PREHOOK: type: QUERY +PREHOOK: Input: type2_scd_helper@customer +PREHOOK: Input: type2_scd_helper@new_customer_stage +PREHOOK: Input: type2_scd_helper@scd_types +PREHOOK: Output: type2_scd_helper@customer +PREHOOK: Output: type2_scd_helper@customer +PREHOOK: Output: type2_scd_helper@merge_tmp_table +POSTHOOK: query: merge into customer +using ( + select + *, + coalesce(invalid_key, source_pk) as join_key + from ( + select + stage.source_pk, stage.name, stage.state, + case when customer.source_pk is null then 1 + when stage.name <> customer.name or stage.state <> customer.state then 2 + else 0 end as scd_row_type + from + new_customer_stage stage + left join + customer + on (stage.source_pk = customer.source_pk and customer.is_current = true) + ) updates + join scd_types on scd_types.type = scd_row_type +) sub +on sub.join_key = customer.source_pk +when matched then update set + is_current = false, + end_date = current_date() +when not matched then insert values + (sub.source_pk, upper(substr(sub.name, 0, 3)), sub.name, sub.state, true, null) +POSTHOOK: type: QUERY +POSTHOOK: Input: type2_scd_helper@customer +POSTHOOK: Input: type2_scd_helper@new_customer_stage +POSTHOOK: Input: type2_scd_helper@scd_types +POSTHOOK: Output: type2_scd_helper@customer +POSTHOOK: Output: type2_scd_helper@customer +POSTHOOK: Output: type2_scd_helper@merge_tmp_table +POSTHOOK: Lineage: customer.end_date EXPRESSION [] +POSTHOOK: Lineage: customer.is_current SIMPLE [] +POSTHOOK: Lineage: customer.name SIMPLE [(new_customer_stage)stage.FieldSchema(name:name, type:string, comment:null), ] +POSTHOOK: Lineage: customer.sk EXPRESSION [(new_customer_stage)stage.FieldSchema(name:name, type:string, comment:null), ] +POSTHOOK: Lineage: customer.source_pk SIMPLE [(new_customer_stage)stage.FieldSchema(name:source_pk, type:int, comment:null), ] +POSTHOOK: Lineage: customer.state SIMPLE [(new_customer_stage)stage.FieldSchema(name:state, type:string, comment:null), ] +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(customer)customer.FieldSchema(name:ROW__ID, type:struct, comment:), ] +PREHOOK: query: select * from customer order by source_pk +PREHOOK: type: QUERY +PREHOOK: Input: type2_scd_helper@customer +#### A masked pattern was here #### +POSTHOOK: query: select * from customer order by source_pk +POSTHOOK: type: QUERY +POSTHOOK: Input: type2_scd_helper@customer +#### A masked pattern was here #### +1 ABC Abc Co. OH true NULL +2 DEF Def Co. PA true NULL +3 XYZ Xyz Co. CA false 2017-03-14 +3 XYZ Xyz Co. TX true NULL +4 PDQ Pdq Co. WI true NULL +PREHOOK: query: drop table customer +PREHOOK: type: DROPTABLE +PREHOOK: Input: type2_scd_helper@customer +PREHOOK: Output: type2_scd_helper@customer +POSTHOOK: query: drop table customer +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: type2_scd_helper@customer +POSTHOOK: Output: type2_scd_helper@customer +PREHOOK: query: drop table customer_updates +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table customer_updates +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table new_customer_stage +PREHOOK: type: DROPTABLE +PREHOOK: Input: type2_scd_helper@new_customer_stage +PREHOOK: Output: type2_scd_helper@new_customer_stage +POSTHOOK: query: drop table new_customer_stage +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: type2_scd_helper@new_customer_stage +POSTHOOK: Output: type2_scd_helper@new_customer_stage +PREHOOK: query: drop table scd_types +PREHOOK: type: DROPTABLE +PREHOOK: Input: type2_scd_helper@scd_types +PREHOOK: Output: type2_scd_helper@scd_types +POSTHOOK: query: drop table scd_types +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: type2_scd_helper@scd_types +POSTHOOK: Output: type2_scd_helper@scd_types