diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java index f68d0ad..7592b99 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java @@ -179,15 +179,35 @@ public Object process(Node nd, Stack stack, // Not safe to continue for RS-GBY-GBY-LIM kind of pipelines. See HIVE-10607 for more. return false; } - if (!checkKeys(cRS.getConf().getKeyCols(), pRS.getConf().getKeyCols(), cRS, pRS)) { - // Keys are not the same; bail out - return false; + if (pRS.getChildren().get(0) instanceof GroupByOperator && + pRS.getChildren().get(0).getChildren().get(0) == cRS) { + // RS-GB-RS + GroupByOperator gBy = (GroupByOperator) pRS.getChildren().get(0); + if (!checkKeys(cRS.getConf().getKeyCols(), gBy.getConf().getKeys(), pRS.getConf().getKeyCols(), + cRS, gBy, pRS)) { + // We cannot push limit; bail out + return false; + } + } else { + if (!checkKeys(cRS.getConf().getKeyCols(), pRS.getConf().getKeyCols(), cRS, pRS)) { + // We cannot push limit; bail out + return false; + } } // Copy order - StringBuilder order = new StringBuilder(cRS.getConf().getOrder()); - StringBuilder orderNull = new StringBuilder(cRS.getConf().getNullOrder()); - order.append(pRS.getConf().getOrder().substring(order.length())); - orderNull.append(pRS.getConf().getNullOrder().substring(orderNull.length())); + StringBuilder order; + StringBuilder orderNull; + if (pRS.getConf().getOrder().length() > cRS.getConf().getOrder().length()) { + order = new StringBuilder(cRS.getConf().getOrder()); + orderNull = new StringBuilder(cRS.getConf().getNullOrder()); + order.append(pRS.getConf().getOrder().substring(order.length())); + orderNull.append(pRS.getConf().getNullOrder().substring(orderNull.length())); + } else { + order = new StringBuilder(cRS.getConf().getOrder().substring( + 0, pRS.getConf().getOrder().length())); + orderNull = new StringBuilder(cRS.getConf().getNullOrder().substring( + 0, pRS.getConf().getNullOrder().length())); + } pRS.getConf().setOrder(order.toString()); pRS.getConf().setNullOrder(orderNull.toString()); // Copy limit @@ -201,6 +221,37 @@ public Object process(Node nd, Stack stack, } } + private static boolean checkKeys(List cKeys, List gKeys, + List pKeys, ReduceSinkOperator cRS, GroupByOperator gBy, ReduceSinkOperator pRS) + throws SemanticException { + if (checkKeys(cKeys, pKeys, cRS, pRS)) { + // If check keys returns true, we can safely return true + return true; + } + if (gKeys.size() != pKeys.size() || pKeys.size() >= cKeys.size()) { + return false; + } + for (int i = 0; i < pKeys.size(); i++) { + ExprNodeDesc gByExpr = ExprNodeDescUtils.backtrack(cKeys.get(i), cRS, gBy); + if (gByExpr == null) { + // cKey is not present in parent + return false; + } + if (!gByExpr.isSame(gKeys.get(i))) { + return false; + } + ExprNodeDesc pExpr = ExprNodeDescUtils.backtrack(gByExpr, gBy, pRS); + if (pExpr == null) { + // cKey is not present in parent + return false; + } + if (!pExpr.isSame(pKeys.get(i))) { + return false; + } + } + return true; + } + private static boolean checkKeys(List cKeys, List pKeys, ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException { if (cKeys == null || cKeys.isEmpty()) { diff --git ql/src/test/queries/clientpositive/limit_pushdown2.q ql/src/test/queries/clientpositive/limit_pushdown2.q index e222763..1f00182 100644 --- ql/src/test/queries/clientpositive/limit_pushdown2.q +++ ql/src/test/queries/clientpositive/limit_pushdown2.q @@ -57,6 +57,24 @@ select key, value, avg(key + 1) from src group by value, key order by key desc limit 20; +explain +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key, value, agg1 limit 20; + +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key, value, agg1 limit 20; + +explain +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key desc, value, agg1 limit 20; + +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key desc, value, agg1 limit 20; + -- NOT APPLICABLE explain select value, avg(key + 1) myavg from src diff --git ql/src/test/results/clientpositive/limit_pushdown2.q.out ql/src/test/results/clientpositive/limit_pushdown2.q.out index b44b529..fac6164 100644 --- ql/src/test/results/clientpositive/limit_pushdown2.q.out +++ ql/src/test/results/clientpositive/limit_pushdown2.q.out @@ -562,6 +562,238 @@ POSTHOOK: Input: default@src 76 val_76 77.0 74 val_74 75.0 72 val_72 73.0 +PREHOOK: query: explain +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key, value, agg1 limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key, value, agg1 limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), (UDFToDouble(key) + 1.0) (type: double) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col2) + keys: _col0 (type: string), _col1 (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + value expressions: _col2 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint) + sort order: +++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey2 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select key, value, count(key + 1) as agg1 from src +group by key, value +order by key, value, agg1 limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, value, count(key + 1) as agg1 from src +group by key, value +order by key, value, agg1 limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 val_0 3 +10 val_10 1 +100 val_100 2 +103 val_103 2 +104 val_104 2 +105 val_105 1 +11 val_11 1 +111 val_111 1 +113 val_113 2 +114 val_114 1 +116 val_116 1 +118 val_118 2 +119 val_119 3 +12 val_12 2 +120 val_120 2 +125 val_125 2 +126 val_126 1 +128 val_128 3 +129 val_129 2 +131 val_131 1 +PREHOOK: query: explain +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key desc, value, agg1 limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, value, count(key + 1) as agg1 from src +group by key, value +order by key desc, value, agg1 limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), (UDFToDouble(key) + 1.0) (type: double) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col2) + keys: _col0 (type: string), _col1 (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: -+ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + value expressions: _col2 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint) + sort order: -++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey2 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select key, value, count(key + 1) as agg1 from src +group by key, value +order by key desc, value, agg1 limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, value, count(key + 1) as agg1 from src +group by key, value +order by key desc, value, agg1 limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +98 val_98 2 +97 val_97 2 +96 val_96 1 +95 val_95 2 +92 val_92 1 +90 val_90 3 +9 val_9 1 +87 val_87 1 +86 val_86 1 +85 val_85 1 +84 val_84 2 +83 val_83 2 +82 val_82 1 +80 val_80 1 +8 val_8 1 +78 val_78 1 +77 val_77 1 +76 val_76 2 +74 val_74 1 +72 val_72 2 PREHOOK: query: -- NOT APPLICABLE explain select value, avg(key + 1) myavg from src