diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index da2091ac5d2..eeca1c9d4ef 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1731,6 +1731,8 @@ spark.only.query.negative.files=spark_job_max_tasks.q,\ spark_submit_negative_executor_cores.q,\ spark_submit_negative_executor_memory.q +tez.perf.disabled.query.files=mv_query44.q + spark.perf.disabled.query.files=query14.q,\ query64.q,\ cbo_query1.q,\ @@ -1829,7 +1831,8 @@ spark.perf.disabled.query.files=query14.q,\ cbo_query96.q,\ cbo_query97.q,\ cbo_query98.q,\ - cbo_query99.q + cbo_query99.q,\ + mv_query44.q druid.query.files=druidmini_test1.q,\ druidmini_test_ts.q,\ diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index afff0df759a..df058eaf6d3 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -285,6 +285,12 @@ public TezPerfCliConfig(boolean useConstraints) { try { setQueryDir("ql/src/test/queries/clientpositive/perf"); + if (useConstraints) { + excludesFrom(testConfigProps, "tez.perf.constraints.disabled.query.files"); + } else { + excludesFrom(testConfigProps, "tez.perf.disabled.query.files"); + } + excludesFrom(testConfigProps, "minimr.query.files"); excludesFrom(testConfigProps, "minitez.query.files"); excludesFrom(testConfigProps, "encrypted.query.files"); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectJoinTransposeRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectJoinTransposeRule.java index 43c78968d0f..38759c0525d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectJoinTransposeRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectJoinTransposeRule.java @@ -17,17 +17,131 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.rules; -import org.apache.calcite.rel.rules.ProjectJoinTransposeRule; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rel.rules.PushProjector; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; -public class HiveProjectJoinTransposeRule extends ProjectJoinTransposeRule { +/** + * Planner rule that pushes a {@link org.apache.calcite.rel.core.Project} + * past a {@link org.apache.calcite.rel.core.Join} + * by splitting the projection into a projection on top of each child of + * the join. + * TODO: Use Calcite rule once we can pass the matching operand as a parameter + */ +public class HiveProjectJoinTransposeRule extends RelOptRule { public static final HiveProjectJoinTransposeRule INSTANCE = new HiveProjectJoinTransposeRule(HiveRelFactories.HIVE_BUILDER); + /** + * Condition for expressions that should be preserved in the projection. + */ + private final PushProjector.ExprCondition preserveExprCondition; + private HiveProjectJoinTransposeRule(RelBuilderFactory relBuilderFactory) { - super(expr -> true, relBuilderFactory); + super( + operand(Project.class, + operand(Join.class, + operand(RelNode.class, any()), + operand(RelNode.class, any()))), + relBuilderFactory, "HiveProjectJoinTransposeRule"); + this.preserveExprCondition = expr -> true; + } + + @Override + public boolean matches(RelOptRuleCall call) { + final RelNode leftInput = call.rel(2); + final RelNode rightInput = call.rel(3); + + if (leftInput instanceof Project && rightInput instanceof Project) { + return false; + } + + return true; + } + + //~ Methods ---------------------------------------------------------------- + + // implement RelOptRule + public void onMatch(RelOptRuleCall call) { + Project origProj = call.rel(0); + final Join join = call.rel(1); + + if (join instanceof SemiJoin) { + return; // TODO: support SemiJoin + } + // locate all fields referenced in the projection and join condition; + // determine which inputs are referenced in the projection and + // join condition; if all fields are being referenced and there are no + // special expressions, no point in proceeding any further + PushProjector pushProject = + new PushProjector( + origProj, + join.getCondition(), + join, + preserveExprCondition, + call.builder()); + if (pushProject.locateAllRefs()) { + return; + } + + // create left and right projections, projecting only those + // fields referenced on each side + RelNode leftProjRel = + pushProject.createProjectRefsAndExprs( + join.getLeft(), + true, + false); + RelNode rightProjRel = + pushProject.createProjectRefsAndExprs( + join.getRight(), + true, + true); + + // convert the join condition to reference the projected columns + RexNode newJoinFilter = null; + int[] adjustments = pushProject.getAdjustments(); + if (join.getCondition() != null) { + List projJoinFieldList = new ArrayList<>(); + projJoinFieldList.addAll( + join.getSystemFieldList()); + projJoinFieldList.addAll( + leftProjRel.getRowType().getFieldList()); + projJoinFieldList.addAll( + rightProjRel.getRowType().getFieldList()); + newJoinFilter = + pushProject.convertRefsAndExprs( + join.getCondition(), + projJoinFieldList, + adjustments); + } + + // create a new join with the projected children + Join newJoinRel = + join.copy( + join.getTraitSet(), + newJoinFilter, + leftProjRel, + rightProjRel, + join.getJoinType(), + join.isSemiJoinDone()); + + // put the original project on top of the join, converting it to + // reference the modified projection list + RelNode topProject = + pushProject.createNewProject(newJoinRel, adjustments); + + call.transformTo(topProject); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 82e975a50de..8e3ef3f7c8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2065,9 +2065,9 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv rules.add(new HivePointLookupOptimizerRule.FilterCondition(minNumORClauses)); rules.add(new HivePointLookupOptimizerRule.JoinCondition(minNumORClauses)); } + rules.add(HiveProjectJoinTransposeRule.INSTANCE); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_CONSTRAINTS_JOIN) && profilesCBO.contains(ExtendedCBOProfile.REFERENTIAL_CONSTRAINTS)) { - rules.add(HiveProjectJoinTransposeRule.INSTANCE); rules.add(HiveJoinConstraintsRule.INSTANCE); } rules.add(HiveJoinAddNotNullRule.INSTANCE_JOIN); @@ -2246,7 +2246,7 @@ private RelNode copyNodeScan(RelNode scan) { perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting"); - if (calcitePreMVRewritingPlan != basePlan) { + if (!RelOptUtil.toString(calcitePreMVRewritingPlan).equals(RelOptUtil.toString(basePlan))) { // A rewriting was produced, we will check whether it was part of an incremental rebuild // to try to replace INSERT OVERWRITE by INSERT if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD && diff --git ql/src/test/queries/clientpositive/perf/mv_query44.q ql/src/test/queries/clientpositive/perf/mv_query44.q new file mode 100644 index 00000000000..7415f5e5e1b --- /dev/null +++ ql/src/test/queries/clientpositive/perf/mv_query44.q @@ -0,0 +1,47 @@ +set hive.mapred.mode=nonstrict; +set hive.materializedview.rewriting.time.window=-1; + +-- start query 1 in stream 0 using template query44.tpl and seed 1819994127 + +CREATE MATERIALIZED VIEW mv_store_sales_item_customer PARTITIONED ON (ss_sold_date_sk) +AS + select ss_item_sk, ss_store_sk, ss_customer_sk, ss_sold_date_sk, count(*) cnt, sum(ss_quantity) as ss_quantity, sum(ss_ext_wholesale_cost) as ss_ext_wholesale_cost,sum(ss_net_paid) as ss_net_paid,sum(ss_net_profit) as ss_net_profit, sum(ss_ext_sales_price) as ss_ext_sales_price, sum(ss_coupon_amt) amt, sum(ss_sales_price) ss_sales_price, sum(ss_quantity*ss_sales_price) ssales + from store_sales + group by ss_store_sk, + ss_item_sk, ss_customer_sk, ss_sold_date_sk; + +explain +select asceding.rnk, i1.i_product_name best_performing, i2.i_product_name worst_performing +from(select * + from (select item_sk,rank() over (order by rank_col asc) rnk + from (select ss_item_sk item_sk,avg(ss_net_profit) rank_col + from store_sales ss1 + where ss_store_sk = 410 + group by ss_item_sk + having avg(ss_net_profit) > 0.9*(select avg(ss_net_profit) rank_col + from store_sales + where ss_store_sk = 410 + and ss_hdemo_sk is null + group by ss_store_sk))V1)V11 + where rnk < 11) asceding, + (select * + from (select item_sk,rank() over (order by rank_col desc) rnk + from (select ss_item_sk item_sk,avg(ss_net_profit) rank_col + from store_sales ss1 + where ss_store_sk = 410 + group by ss_item_sk + having avg(ss_net_profit) > 0.9*(select avg(ss_net_profit) rank_col + from store_sales + where ss_store_sk = 410 + and ss_hdemo_sk is null + group by ss_store_sk))V2)V21 + where rnk < 11) descending, +item i1, +item i2 +where asceding.rnk = descending.rnk + and i1.i_item_sk=asceding.item_sk + and i2.i_item_sk=descending.item_sk +order by asceding.rnk +limit 100; + +-- end query 1 in stream 0 using template query44.tpl diff --git ql/src/test/results/clientpositive/perf/tez/constraints/mv_query44.q.out ql/src/test/results/clientpositive/perf/tez/constraints/mv_query44.q.out new file mode 100644 index 00000000000..db9acc93cb2 --- /dev/null +++ ql/src/test/results/clientpositive/perf/tez/constraints/mv_query44.q.out @@ -0,0 +1,214 @@ +PREHOOK: query: CREATE MATERIALIZED VIEW mv_store_sales_item_customer PARTITIONED ON (ss_sold_date_sk) +AS + select ss_item_sk, ss_store_sk, ss_customer_sk, ss_sold_date_sk, count(*) cnt, sum(ss_quantity) as ss_quantity, sum(ss_ext_wholesale_cost) as ss_ext_wholesale_cost,sum(ss_net_paid) as ss_net_paid,sum(ss_net_profit) as ss_net_profit, sum(ss_ext_sales_price) as ss_ext_sales_price, sum(ss_coupon_amt) amt, sum(ss_sales_price) ss_sales_price, sum(ss_quantity*ss_sales_price) ssales + from store_sales + group by ss_store_sk, + ss_item_sk, ss_customer_sk, ss_sold_date_sk +PREHOOK: type: CREATE_MATERIALIZED_VIEW +PREHOOK: Input: default@store_sales +PREHOOK: Output: database:default +PREHOOK: Output: default@mv_store_sales_item_customer +PREHOOK: Output: default@mv_store_sales_item_customer +POSTHOOK: query: CREATE MATERIALIZED VIEW mv_store_sales_item_customer PARTITIONED ON (ss_sold_date_sk) +AS + select ss_item_sk, ss_store_sk, ss_customer_sk, ss_sold_date_sk, count(*) cnt, sum(ss_quantity) as ss_quantity, sum(ss_ext_wholesale_cost) as ss_ext_wholesale_cost,sum(ss_net_paid) as ss_net_paid,sum(ss_net_profit) as ss_net_profit, sum(ss_ext_sales_price) as ss_ext_sales_price, sum(ss_coupon_amt) amt, sum(ss_sales_price) ss_sales_price, sum(ss_quantity*ss_sales_price) ssales + from store_sales + group by ss_store_sk, + ss_item_sk, ss_customer_sk, ss_sold_date_sk +POSTHOOK: type: CREATE_MATERIALIZED_VIEW +POSTHOOK: Input: default@store_sales +POSTHOOK: Output: database:default +POSTHOOK: Output: default@mv_store_sales_item_customer +Warning: Shuffle Join MERGEJOIN[101][tables = [$hdt$_1, $hdt$_2]] in Stage 'Reducer 8' is a cross product +PREHOOK: query: explain +select asceding.rnk, i1.i_product_name best_performing, i2.i_product_name worst_performing +from(select * + from (select item_sk,rank() over (order by rank_col asc) rnk + from (select ss_item_sk item_sk,avg(ss_net_profit) rank_col + from store_sales ss1 + where ss_store_sk = 410 + group by ss_item_sk + having avg(ss_net_profit) > 0.9*(select avg(ss_net_profit) rank_col + from store_sales + where ss_store_sk = 410 + and ss_hdemo_sk is null + group by ss_store_sk))V1)V11 + where rnk < 11) asceding, + (select * + from (select item_sk,rank() over (order by rank_col desc) rnk + from (select ss_item_sk item_sk,avg(ss_net_profit) rank_col + from store_sales ss1 + where ss_store_sk = 410 + group by ss_item_sk + having avg(ss_net_profit) > 0.9*(select avg(ss_net_profit) rank_col + from store_sales + where ss_store_sk = 410 + and ss_hdemo_sk is null + group by ss_store_sk))V2)V21 + where rnk < 11) descending, +item i1, +item i2 +where asceding.rnk = descending.rnk + and i1.i_item_sk=asceding.item_sk + and i2.i_item_sk=descending.item_sk +order by asceding.rnk +limit 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@item +PREHOOK: Input: default@store_sales +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain +select asceding.rnk, i1.i_product_name best_performing, i2.i_product_name worst_performing +from(select * + from (select item_sk,rank() over (order by rank_col asc) rnk + from (select ss_item_sk item_sk,avg(ss_net_profit) rank_col + from store_sales ss1 + where ss_store_sk = 410 + group by ss_item_sk + having avg(ss_net_profit) > 0.9*(select avg(ss_net_profit) rank_col + from store_sales + where ss_store_sk = 410 + and ss_hdemo_sk is null + group by ss_store_sk))V1)V11 + where rnk < 11) asceding, + (select * + from (select item_sk,rank() over (order by rank_col desc) rnk + from (select ss_item_sk item_sk,avg(ss_net_profit) rank_col + from store_sales ss1 + where ss_store_sk = 410 + group by ss_item_sk + having avg(ss_net_profit) > 0.9*(select avg(ss_net_profit) rank_col + from store_sales + where ss_store_sk = 410 + and ss_hdemo_sk is null + group by ss_store_sk))V2)V21 + where rnk < 11) descending, +item i1, +item i2 +where asceding.rnk = descending.rnk + and i1.i_item_sk=asceding.item_sk + and i2.i_item_sk=descending.item_sk +order by asceding.rnk +limit 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@item +POSTHOOK: Input: default@store_sales +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 10 <- Reducer 8 (SIMPLE_EDGE) +Reducer 12 <- Map 11 (SIMPLE_EDGE) +Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 9 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) +Reducer 4 <- Reducer 3 (SIMPLE_EDGE) +Reducer 5 <- Map 1 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE) +Reducer 7 <- Map 6 (SIMPLE_EDGE) +Reducer 8 <- Reducer 12 (CUSTOM_SIMPLE_EDGE), Reducer 7 (CUSTOM_SIMPLE_EDGE) +Reducer 9 <- Reducer 8 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:100 + Stage-1 + Reducer 4 vectorized + File Output Operator [FS_135] + Limit [LIM_134] (rows=100 width=218) + Number of rows:100 + Select Operator [SEL_133] (rows=6951 width=218) + Output:["_col0","_col1","_col2"] + <-Reducer 3 [SIMPLE_EDGE] + SHUFFLE [RS_67] + Select Operator [SEL_66] (rows=6951 width=218) + Output:["_col0","_col1","_col2"] + Merge Join Operator [MERGEJOIN_105] (rows=6951 width=218) + Conds:RS_63._col3=RS_64._col3(Inner),Output:["_col1","_col3","_col5"] + <-Reducer 2 [SIMPLE_EDGE] + SHUFFLE [RS_63] + PartitionCols:_col3 + Merge Join Operator [MERGEJOIN_102] (rows=6951 width=111) + Conds:RS_107._col0=RS_127._col0(Inner),Output:["_col1","_col3"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_107] + PartitionCols:_col0 + Select Operator [SEL_106] (rows=462000 width=111) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=462000 width=111) + default@item,i1,Tbl:COMPLETE,Col:COMPLETE,Output:["i_item_sk","i_product_name"] + <-Reducer 9 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_127] + PartitionCols:_col0 + Select Operator [SEL_126] (rows=6951 width=8) + Output:["_col0","_col1"] + Filter Operator [FIL_125] (rows=6951 width=116) + predicate:(rank_window_0 < 11) + PTF Operator [PTF_124] (rows=20854 width=116) + Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 ASC NULLS FIRST","partition by:":"0"}] + Select Operator [SEL_123] (rows=20854 width=116) + Output:["_col0","_col1"] + <-Reducer 8 [SIMPLE_EDGE] + SHUFFLE [RS_21] + PartitionCols:0 + Filter Operator [FIL_20] (rows=20854 width=228) + predicate:(_col1 > (0.9 * _col2)) + Merge Join Operator [MERGEJOIN_101] (rows=62562 width=228) + Conds:(Inner),Output:["_col0","_col1","_col2"] + <-Reducer 12 [CUSTOM_SIMPLE_EDGE] vectorized + PARTITION_ONLY_SHUFFLE [RS_122] + Select Operator [SEL_121] (rows=1 width=112) + Output:["_col0"] + Group By Operator [GBY_120] (rows=1 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","count(VALUE._col1)"],keys:KEY._col0 + <-Map 11 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_119] + PartitionCols:_col0 + Group By Operator [GBY_118] (rows=258 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(_col1)","count(_col1)"],keys:true + Select Operator [SEL_117] (rows=287946 width=114) + Output:["_col1"] + Filter Operator [FIL_116] (rows=287946 width=114) + predicate:((ss_store_sk = 410) and ss_hdemo_sk is null) + TableScan [TS_9] (rows=575995635 width=114) + default@store_sales,store_sales,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_hdemo_sk","ss_store_sk","ss_net_profit"] + <-Reducer 7 [CUSTOM_SIMPLE_EDGE] vectorized + PARTITION_ONLY_SHUFFLE [RS_115] + Select Operator [SEL_114] (rows=62562 width=116) + Output:["_col0","_col1"] + Group By Operator [GBY_113] (rows=62562 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)","count(VALUE._col1)"],keys:KEY._col0 + <-Map 6 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_112] + PartitionCols:_col0 + Group By Operator [GBY_111] (rows=3199976 width=124) + Output:["_col0","_col1","_col2"],aggregations:["sum(ss_net_profit)","count(ss_net_profit)"],keys:ss_item_sk + Select Operator [SEL_110] (rows=6399952 width=114) + Output:["ss_item_sk","ss_net_profit"] + Filter Operator [FIL_109] (rows=6399952 width=114) + predicate:(ss_store_sk = 410) + TableScan [TS_2] (rows=575995635 width=114) + default@store_sales,ss1,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_item_sk","ss_store_sk","ss_net_profit"] + <-Reducer 5 [SIMPLE_EDGE] + SHUFFLE [RS_64] + PartitionCols:_col3 + Merge Join Operator [MERGEJOIN_104] (rows=6951 width=111) + Conds:RS_108._col0=RS_132._col0(Inner),Output:["_col1","_col3"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_108] + PartitionCols:_col0 + Please refer to the previous Select Operator [SEL_106] + <-Reducer 10 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_132] + PartitionCols:_col0 + Select Operator [SEL_131] (rows=6951 width=8) + Output:["_col0","_col1"] + Filter Operator [FIL_130] (rows=6951 width=116) + predicate:(rank_window_0 < 11) + PTF Operator [PTF_129] (rows=20854 width=116) + Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 DESC NULLS LAST","partition by:":"0"}] + Select Operator [SEL_128] (rows=20854 width=116) + Output:["_col0","_col1"] + <-Reducer 8 [SIMPLE_EDGE] + SHUFFLE [RS_49] + PartitionCols:0 + Please refer to the previous Filter Operator [FIL_20] +