diff --git hbase-handler/src/test/queries/positive/hbase_pushdown.q hbase-handler/src/test/queries/positive/hbase_pushdown.q index 0d29c82..776afef 100644 --- hbase-handler/src/test/queries/positive/hbase_pushdown.q +++ hbase-handler/src/test/queries/positive/hbase_pushdown.q @@ -17,6 +17,21 @@ explain select * from hbase_pushdown where key=90 and value like '%90%'; select * from hbase_pushdown where key=90 and value like '%90%'; +-- with union + +explain +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u; + +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u; + set hive.optimize.index.filter=true; -- with partial pushdown with optimization (HIVE-6650) explain select * from hbase_pushdown where key=90 and value like '%90%'; diff --git hbase-handler/src/test/results/positive/hbase_pushdown.q.out hbase-handler/src/test/results/positive/hbase_pushdown.q.out index 8a979bf..2e75c15 100644 --- hbase-handler/src/test/results/positive/hbase_pushdown.q.out +++ hbase-handler/src/test/results/positive/hbase_pushdown.q.out @@ -120,6 +120,107 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@hbase_pushdown #### A masked pattern was here #### 90 val_90 +PREHOOK: query: -- with union + +explain +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +PREHOOK: type: QUERY +POSTHOOK: query: -- with union + +explain +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_pushdown + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (key < 10) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Union + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TableScan + alias: hbase_pushdown + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (key > 999) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Union + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_pushdown +#### A masked pattern was here #### +POSTHOOK: query: select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_pushdown +#### A masked pattern was here #### +0 val_0 +2 val_2 +4 val_4 +5 val_5 +8 val_8 +9 val_9 PREHOOK: query: -- with partial pushdown with optimization (HIVE-6650) explain select * from hbase_pushdown where key=90 and value like '%90%' PREHOOK: type: QUERY diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 703c9d1..f890c9d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -275,6 +275,15 @@ protected void optimizeTaskPlan(List> rootTasks, } @Override + protected List extractMapWorks(List> rootTasks) { + List mapWorks = new ArrayList(); + for (ExecDriver task : Utilities.getMRTasks(rootTasks)) { + mapWorks.add(task.getWork().getMapWork()); + } + return mapWorks; + } + + @Override protected void generateTaskTree(List> rootTasks, ParseContext pCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 23fbbe1..4255900 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,10 +20,14 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import com.google.common.collect.Interner; @@ -38,7 +42,12 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -52,10 +61,16 @@ import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -222,6 +237,8 @@ public void compile(final ParseContext pCtx, final List inputs protected abstract void optimizeTaskPlan(List> rootTasks, ParseContext pCtx, Context ctx) throws SemanticException; + protected abstract List extractMapWorks(List> rootTasks); + + protected void finalizeTaskPlan(List> rootTasks, + ParseContext pCtx, Context ctx) throws SemanticException { + for (MapWork mapWork : extractMapWorks(rootTasks)) { + + // for single sourced multiple aliases, multiple filter expr in TS + // cannot be evaluated (see HiveInputFormat#getSplits) + // todo : 'or'ed predicate can be useful for some storage handlers (not for hbase, currently) + Map> aliasToWork = mapWork.getAliasToWork(); + for (ArrayList aliases : mapWork.getPathToAliases().values()) { + if (aliases.size() <= 1) { + continue; + } + List tss = new ArrayList(); + List> tsExprs = new ArrayList>(); + for (String alias : aliases) { + Operator operator = aliasToWork.get(alias); + if (operator instanceof TableScanOperator) { + TableScanOperator ts = (TableScanOperator) operator; + if (ts.getConf().getFilterExpr() != null) { + tss.add(ts); + tsExprs.add(ExprNodeDescUtils.split(ts.getConf().getFilterExpr())); + } + } + } + if (tss.isEmpty()) { + continue; + } + Collection commonSet = getCommonExprs(tsExprs); + if (!commonSet.isEmpty()) { + tsExprs = exceptCommonExpr(tsExprs, commonSet); + } + ExprNodeGenericFuncDesc commonExpr = + (ExprNodeGenericFuncDesc) ExprNodeDescUtils.mergePredicates(commonSet); + // recreate filter for each TS + for (int i = 0 ; i < tss.size(); i++) { + TableScanOperator ts = tss.get(i); + ts.getConf().setFilterExpr(commonExpr); + List tsExpr = tsExprs.get(i); + if (tsExpr.isEmpty()) { + continue; // nothing todo + } + FilterOperator child = OperatorUtils.findSingleOperator(ts, FilterOperator.class); + if (child != null) { + FilterDesc filterDesc = child.getConf(); + filterDesc.setPredicate( + ExprNodeDescUtils.appendPredicatesWithDedup(filterDesc.getPredicate(), tsExpr)); + } else { + ExprNodeDesc predicate = ExprNodeDescUtils.mergePredicates(tsExpr); + FilterDesc filterDesc = new FilterDesc(predicate, false); + Operator[] children = new Operator[ts.getNumChild()]; + for (int j = 0; j < children.length; j++) { + children[j] = ts.getChildOperators().get(j); + children[j].getParentOperators().clear(); + } + Operator newFilter = OperatorFactory.get(filterDesc, children); + List> parents = + new ArrayList>(Arrays.asList(ts)); + newFilter.setParentOperators(parents); + ts.getChildOperators().clear(); + ts.getChildOperators().add(newFilter); + } + } + } + } + } + + private Collection getCommonExprs(List> tsExprs) { + Collection commonExprs = null; + for (List exprs : tsExprs) { + if (commonExprs == null) { + commonExprs = new ArrayList(exprs); + } else { + ExprNodeDescUtils.retainAll(commonExprs, exprs); + } + if (commonExprs.isEmpty()) { + return Collections.emptyList(); + } + } + return commonExprs; + } + + private List> exceptCommonExpr(List> tsExprs, + Collection common) { + for (List exprs : tsExprs) { + exprs.removeAll(common); + } + return tsExprs; + } + /* * Called to set the appropriate input format for tasks */ protected abstract void setInputFormat(Task rootTask); /* - * Called to generate the taks tree from the parse context/operator tree + * Called to generate the task tree from the parse context/operator tree */ protected abstract void generateTaskTree(List> rootTasks, ParseContext pCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ea12990..c79df22 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -477,4 +478,17 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } return; } + + @Override + protected List extractMapWorks(List> rootTasks) { + List mapWorks = new ArrayList(); + for (TezTask task : Utilities.getTezTasks(rootTasks)) { + for (BaseWork work : task.getWork().getRoots()) { + if (work instanceof MapWork) { + mapWorks.add((MapWork)work); + } + } + } + return mapWorks; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index 26dec47..c1df5a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -99,6 +101,28 @@ public static boolean containsPredicate(ExprNodeDesc source, ExprNodeDesc predic return false; } + public static ExprNodeDesc appendPredicatesWithDedup( + ExprNodeDesc prev, Collection adding) { + List split = split(prev); + for (ExprNodeDesc expr : adding) { + if (indexOf(expr, split) < 0) { + split.add(expr); + } + } + return mergePredicates(split); + } + + /** + * 'and' all predicates with deduplication. (Do not use on non-deterministic exprs) + */ + public static ExprNodeDesc mergePredicatesWithDedup(ExprNodeDesc... predicates) { + List split = new ArrayList(); + for (ExprNodeDesc expr : predicates) { + split(expr, split); + } + return mergePredicates(split); + } + /** * bind two predicates by AND op */ @@ -113,7 +137,7 @@ public static ExprNodeGenericFuncDesc mergePredicates(ExprNodeDesc prev, ExprNod /** * bind n predicates by AND op */ - public static ExprNodeDesc mergePredicates(List exprs) { + public static ExprNodeDesc mergePredicates(Collection exprs) { ExprNodeDesc prev = null; for (ExprNodeDesc expr : exprs) { if (prev == null) { @@ -135,17 +159,29 @@ public static ExprNodeDesc mergePredicates(List exprs) { /** * split predicates by AND op */ - public static List split(ExprNodeDesc current, List splitted) { + public static List split(ExprNodeDesc current, List split) { if (FunctionRegistry.isOpAnd(current)) { for (ExprNodeDesc child : current.getChildren()) { - split(child, splitted); + split(child, split); } - return splitted; + return split; + } + if (indexOf(current, split) < 0) { + split.add(current); } - if (indexOf(current, splitted) < 0) { - splitted.add(current); + return split; + } + + public static boolean retainAll(Collection exprs, List retainer) { + boolean modified = false; + Iterator e = exprs.iterator(); + while (e.hasNext()) { + if (indexOf(e.next(), retainer) < 0) { + modified = true; + e.remove(); + } } - return splitted; + return modified; } /** diff --git ql/src/test/results/clientpositive/index_auto_self_join.q.out ql/src/test/results/clientpositive/index_auto_self_join.q.out index 19ff6b0..ff9b208 100644 --- ql/src/test/results/clientpositive/index_auto_self_join.q.out +++ ql/src/test/results/clientpositive/index_auto_self_join.q.out @@ -175,7 +175,7 @@ STAGE PLANS: Map Operator Tree: TableScan alias: a - filterExpr: (((UDFToDouble(key) > 80.0) and (UDFToDouble(key) < 100.0)) and value is not null) (type: boolean) + filterExpr: value is not null (type: boolean) Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (((UDFToDouble(key) > 80.0) and (UDFToDouble(key) < 100.0)) and value is not null) (type: boolean) @@ -192,7 +192,7 @@ STAGE PLANS: value expressions: _col0 (type: string) TableScan alias: a - filterExpr: (((UDFToDouble(key) > 70.0) and (UDFToDouble(key) < 90.0)) and value is not null) (type: boolean) + filterExpr: value is not null (type: boolean) Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (((UDFToDouble(key) > 70.0) and (UDFToDouble(key) < 90.0)) and value is not null) (type: boolean)