diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1838300247..2f9903f7ad 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -513,6 +513,7 @@ minillaplocal.query.files=\ compare_double_bigint_2.q,\ constprog_dpp.q,\ constant_prop_when.q,\ + constant_prop_join_rs.q,\ constraints_alter.q,\ constraints_optimization.q,\ current_date_timestamp.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index 55eb9d8928..c44dc73801 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -1448,11 +1448,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. // Note: the following code (removing folded constants in exprs) is deeply coupled with // ColumnPruner optimizer. // Assuming ColumnPrunner will remove constant columns so we don't deal with output columns. - // Except one case that the join operator is followed by a redistribution (RS operator). - if (op.getChildOperators().size() == 1 - && op.getChildOperators().get(0) instanceof ReduceSinkOperator) { - LOG.debug("Skip JOIN-RS structure."); - return null; + // Except one case that the join operator is followed by a redistribution (RS operator) -- skipping filter ops + if (op.getChildOperators().size() == 1) { + Node ndRecursive = op; + while (ndRecursive.getChildren().size() == 1 && ndRecursive.getChildren().get(0) instanceof FilterOperator) { + ndRecursive = ndRecursive.getChildren().get(0); + } + if (ndRecursive.getChildren().get(0) instanceof ReduceSinkOperator) { + LOG.debug("Skip JOIN-FIL(*)-RS structure."); + return null; + } } if (LOG.isInfoEnabled()) { LOG.info("Old exprs " + conf.getExprs()); diff --git a/ql/src/test/queries/clientpositive/constant_prop_join_rs.q b/ql/src/test/queries/clientpositive/constant_prop_join_rs.q new file mode 100644 index 0000000000..564a429139 --- /dev/null +++ b/ql/src/test/queries/clientpositive/constant_prop_join_rs.q @@ -0,0 +1,46 @@ +set hive.llap.io.enabled=false; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10; + +drop table if exists t0; +drop table if exists t1; +drop table if exists t2; +drop table if exists t3; + +create table if not exists t0 (c00 int, c01 int, c03 TIMESTAMP) PARTITIONED BY (c02 string) stored as orc; +create table if not exists t1 (c10 int, c11 int, c12 int) PARTITIONED BY (c13 string) stored as orc; + +create table if not exists t2 (c20 int) PARTITIONED BY (c21 string) stored as orc; +create table if not exists t3 (c30 TIMESTAMP) PARTITIONED BY (c31 string) stored as orc; + + +alter table t0 add partition(c02='test0'); +alter table t1 add partition(c13='test1'); +alter table t2 add partition(c21='test1'); +alter table t3 add partition(c31='test2'); + + +alter table t0 partition(c02='test0') update statistics set('numRows'='153373500','rawDataSize'='2053794707568'); +alter table t1 partition(c13='test1') update statistics set('numRows'='1250000','rawDataSize'='2700000000'); +alter table t2 partition(c21='test1') update statistics set('numRows'='475011','rawDataSize'='641987831'); +alter table t3 partition(c31='test2') update statistics set('numRows'='136672296','rawDataSize'='141045810480'); + + +set hive.explain.user=false; +set hive.cbo.enable=false; +set hive.join.inner.residual=false; + +explain SELECT t0.c00 FROM t0 +JOIN t1 ON (t0.c00 = t1.c10 AND t0.c01 BETWEEN t1.c11 AND t1.c12) +LEFT OUTER JOIN t2 ON ( t1.c13 = t2.c21) +LEFT OUTER JOIN + (SELECT c30 FROM t3) s0 ON datediff (CURRENT_TIMESTAMP, t0.c03) = datediff (CURRENT_TIMESTAMP, s0.c30) +WHERE t1.c13 = 'test1'; + +SELECT t0.c00 FROM t0 +JOIN t1 ON (t0.c00 = t1.c10 AND t0.c01 BETWEEN t1.c11 AND t1.c12) +LEFT OUTER JOIN t2 ON ( t1.c13 = t2.c21) +LEFT OUTER JOIN + (SELECT c30 FROM t3) s0 ON datediff (CURRENT_TIMESTAMP, t0.c03) = datediff (CURRENT_TIMESTAMP, s0.c30) +WHERE t1.c13 = 'test1'; diff --git a/ql/src/test/results/clientpositive/llap/constant_prop_join_rs.q b/ql/src/test/results/clientpositive/llap/constant_prop_join_rs.q new file mode 100644 index 0000000000..970dc0883e --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/constant_prop_join_rs.q @@ -0,0 +1,347 @@ +PREHOOK: query: drop table if exists t0 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists t0 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists t1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists t1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists t2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists t2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists t3 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists t3 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table if not exists t0 (c00 int, c01 int, c03 TIMESTAMP) PARTITIONED BY (c02 string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t0 +POSTHOOK: query: create table if not exists t0 (c00 int, c01 int, c03 TIMESTAMP) PARTITIONED BY (c02 string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t0 +PREHOOK: query: create table if not exists t1 (c10 int, c11 int, c12 int) PARTITIONED BY (c13 string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table if not exists t1 (c10 int, c11 int, c12 int) PARTITIONED BY (c13 string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: create table if not exists t2 (c20 int) PARTITIONED BY (c21 string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t2 +POSTHOOK: query: create table if not exists t2 (c20 int) PARTITIONED BY (c21 string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t2 +PREHOOK: query: create table if not exists t3 (c30 TIMESTAMP) PARTITIONED BY (c31 string) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t3 +POSTHOOK: query: create table if not exists t3 (c30 TIMESTAMP) PARTITIONED BY (c31 string) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t3 +PREHOOK: query: alter table t0 add partition(c02='test0') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@t0 +POSTHOOK: query: alter table t0 add partition(c02='test0') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@t0 +POSTHOOK: Output: default@t0@c02=test0 +PREHOOK: query: alter table t1 add partition(c13='test1') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@t1 +POSTHOOK: query: alter table t1 add partition(c13='test1') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@t1 +POSTHOOK: Output: default@t1@c13=test1 +PREHOOK: query: alter table t2 add partition(c21='test1') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@t2 +POSTHOOK: query: alter table t2 add partition(c21='test1') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@t2 +POSTHOOK: Output: default@t2@c21=test1 +PREHOOK: query: alter table t3 add partition(c31='test2') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@t3 +POSTHOOK: query: alter table t3 add partition(c31='test2') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@t3 +POSTHOOK: Output: default@t3@c31=test2 +PREHOOK: query: alter table t0 partition(c02='test0') update statistics set('numRows'='153373500','rawDataSize'='2053794707568') +PREHOOK: type: ALTERTABLE_UPDATEPARTSTATS +PREHOOK: Input: default@t0 +PREHOOK: Output: default@t0@c02=test0 +POSTHOOK: query: alter table t0 partition(c02='test0') update statistics set('numRows'='153373500','rawDataSize'='2053794707568') +POSTHOOK: type: ALTERTABLE_UPDATEPARTSTATS +POSTHOOK: Input: default@t0 +POSTHOOK: Input: default@t0@c02=test0 +POSTHOOK: Output: default@t0@c02=test0 +PREHOOK: query: alter table t1 partition(c13='test1') update statistics set('numRows'='1250000','rawDataSize'='2700000000') +PREHOOK: type: ALTERTABLE_UPDATEPARTSTATS +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1@c13=test1 +POSTHOOK: query: alter table t1 partition(c13='test1') update statistics set('numRows'='1250000','rawDataSize'='2700000000') +POSTHOOK: type: ALTERTABLE_UPDATEPARTSTATS +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t1@c13=test1 +POSTHOOK: Output: default@t1@c13=test1 +PREHOOK: query: alter table t2 partition(c21='test1') update statistics set('numRows'='475011','rawDataSize'='641987831') +PREHOOK: type: ALTERTABLE_UPDATEPARTSTATS +PREHOOK: Input: default@t2 +PREHOOK: Output: default@t2@c21=test1 +POSTHOOK: query: alter table t2 partition(c21='test1') update statistics set('numRows'='475011','rawDataSize'='641987831') +POSTHOOK: type: ALTERTABLE_UPDATEPARTSTATS +POSTHOOK: Input: default@t2 +POSTHOOK: Input: default@t2@c21=test1 +POSTHOOK: Output: default@t2@c21=test1 +PREHOOK: query: alter table t3 partition(c31='test2') update statistics set('numRows'='136672296','rawDataSize'='141045810480') +PREHOOK: type: ALTERTABLE_UPDATEPARTSTATS +PREHOOK: Input: default@t3 +PREHOOK: Output: default@t3@c31=test2 +POSTHOOK: query: alter table t3 partition(c31='test2') update statistics set('numRows'='136672296','rawDataSize'='141045810480') +POSTHOOK: type: ALTERTABLE_UPDATEPARTSTATS +POSTHOOK: Input: default@t3 +POSTHOOK: Input: default@t3@c31=test2 +POSTHOOK: Output: default@t3@c31=test2 +PREHOOK: query: explain SELECT t0.c00 FROM t0 +JOIN t1 ON (t0.c00 = t1.c10 AND t0.c01 BETWEEN t1.c11 AND t1.c12) +LEFT OUTER JOIN t2 ON ( t1.c13 = t2.c21) +LEFT OUTER JOIN + (SELECT c30 FROM t3) s0 ON datediff (CURRENT_TIMESTAMP, t0.c03) = datediff (CURRENT_TIMESTAMP, s0.c30) +WHERE t1.c13 = 'test1' +PREHOOK: type: QUERY +PREHOOK: Input: default@t0 +PREHOOK: Input: default@t0@c02=test0 +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t1@c13=test1 +PREHOOK: Input: default@t2 +PREHOOK: Input: default@t2@c21=test1 +PREHOOK: Input: default@t3 +PREHOOK: Input: default@t3@c31=test2 +#### A masked pattern was here #### +POSTHOOK: query: explain SELECT t0.c00 FROM t0 +JOIN t1 ON (t0.c00 = t1.c10 AND t0.c01 BETWEEN t1.c11 AND t1.c12) +LEFT OUTER JOIN t2 ON ( t1.c13 = t2.c21) +LEFT OUTER JOIN + (SELECT c30 FROM t3) s0 ON datediff (CURRENT_TIMESTAMP, t0.c03) = datediff (CURRENT_TIMESTAMP, s0.c30) +WHERE t1.c13 = 'test1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t0 +POSTHOOK: Input: default@t0@c02=test0 +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t1@c13=test1 +POSTHOOK: Input: default@t2 +POSTHOOK: Input: default@t2@c21=test1 +POSTHOOK: Input: default@t3 +POSTHOOK: Input: default@t3@c31=test2 +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 3 <- Reducer 7 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 4 <- Map 3 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 5 <- Map 8 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) + Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t3 + Statistics: Num rows: 136672296 Data size: 146239357800 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: c30 (type: timestamp) + outputColumnNames: _col0 + Statistics: Num rows: 136672296 Data size: 146239357800 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: datediff(CURRENT_TIMESTAMP(), _col0) (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: datediff(CURRENT_TIMESTAMP(), _col0) (type: int) + Statistics: Num rows: 136672296 Data size: 146239357800 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + Map 3 + Map Operator Tree: + TableScan + alias: t0 + filterExpr: (c00 is not null and c00 BETWEEN DynamicValue(RS_8_t1_c10_min) AND DynamicValue(RS_8_t1_c10_max) and in_bloom_filter(c00, DynamicValue(RS_8_t1_c10_bloom_filter))) (type: boolean) + Statistics: Num rows: 153373500 Data size: 2060788539216 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (c00 is not null and c00 BETWEEN DynamicValue(RS_8_t1_c10_min) AND DynamicValue(RS_8_t1_c10_max) and in_bloom_filter(c00, DynamicValue(RS_8_t1_c10_bloom_filter))) (type: boolean) + Statistics: Num rows: 145704825 Data size: 1957749112255 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: c00 (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: c00 (type: int) + Statistics: Num rows: 145704825 Data size: 1957749112255 Basic stats: COMPLETE Column stats: NONE + value expressions: c01 (type: int), c03 (type: timestamp) + Execution mode: vectorized, llap + Map 6 + Map Operator Tree: + TableScan + alias: t1 + filterExpr: c10 is not null (type: boolean) + Statistics: Num rows: 1250000 Data size: 2714250012 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: c10 is not null (type: boolean) + Statistics: Num rows: 1187500 Data size: 2578537511 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: c10 (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: c10 (type: int) + Statistics: Num rows: 1187500 Data size: 2578537511 Basic stats: COMPLETE Column stats: NONE + value expressions: c11 (type: int), c12 (type: int), 'test1' (type: string) + Select Operator + expressions: c10 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1187500 Data size: 2578537511 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1187500) + minReductionHashAggr: 0.99 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + Map 8 + Map Operator Tree: + TableScan + alias: t2 + filterExpr: (c21 = 'test1') (type: boolean) + Statistics: Num rows: 475011 Data size: 684263810 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: c21 (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: c21 (type: string) + Statistics: Num rows: 475011 Data size: 684263810 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Left Outer Join 0 to 1 + keys: + 0 datediff(CURRENT_TIMESTAMP(), _col2) (type: int) + 1 datediff(CURRENT_TIMESTAMP(), _col0) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 150339528 Data size: 160863297066 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 150339528 Data size: 160863297066 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 c00 (type: int) + 1 c10 (type: int) + outputColumnNames: _col0, _col1, _col2, _col8, _col9, _col10 + Statistics: Num rows: 160275310 Data size: 2153524070156 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col1 BETWEEN _col8 AND _col9 (type: boolean) + Statistics: Num rows: 17808367 Data size: 239280441789 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col10 (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: _col10 (type: string) + Statistics: Num rows: 17808367 Data size: 239280441789 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col2 (type: timestamp) + Reducer 5 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Left Outer Join 0 to 1 + keys: + 0 _col10 (type: string) + 1 c21 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 19589204 Data size: 263208491672 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: datediff(CURRENT_TIMESTAMP(), _col2) (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: datediff(CURRENT_TIMESTAMP(), _col2) (type: int) + Statistics: Num rows: 19589204 Data size: 263208491672 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Reducer 7 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1187500) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT t0.c00 FROM t0 +JOIN t1 ON (t0.c00 = t1.c10 AND t0.c01 BETWEEN t1.c11 AND t1.c12) +LEFT OUTER JOIN t2 ON ( t1.c13 = t2.c21) +LEFT OUTER JOIN + (SELECT c30 FROM t3) s0 ON datediff (CURRENT_TIMESTAMP, t0.c03) = datediff (CURRENT_TIMESTAMP, s0.c30) +WHERE t1.c13 = 'test1' +PREHOOK: type: QUERY +PREHOOK: Input: default@t0 +PREHOOK: Input: default@t0@c02=test0 +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t1@c13=test1 +PREHOOK: Input: default@t2 +PREHOOK: Input: default@t2@c21=test1 +PREHOOK: Input: default@t3 +PREHOOK: Input: default@t3@c31=test2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT t0.c00 FROM t0 +JOIN t1 ON (t0.c00 = t1.c10 AND t0.c01 BETWEEN t1.c11 AND t1.c12) +LEFT OUTER JOIN t2 ON ( t1.c13 = t2.c21) +LEFT OUTER JOIN + (SELECT c30 FROM t3) s0 ON datediff (CURRENT_TIMESTAMP, t0.c03) = datediff (CURRENT_TIMESTAMP, s0.c30) +WHERE t1.c13 = 'test1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t0 +POSTHOOK: Input: default@t0@c02=test0 +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t1@c13=test1 +POSTHOOK: Input: default@t2 +POSTHOOK: Input: default@t2@c21=test1 +POSTHOOK: Input: default@t3 +POSTHOOK: Input: default@t3@c31=test2 +#### A masked pattern was here ####