diff --git hbase-handler/src/test/queries/positive/hbase_pushdown.q hbase-handler/src/test/queries/positive/hbase_pushdown.q index 0d29c82..776afef 100644 --- hbase-handler/src/test/queries/positive/hbase_pushdown.q +++ hbase-handler/src/test/queries/positive/hbase_pushdown.q @@ -17,6 +17,21 @@ explain select * from hbase_pushdown where key=90 and value like '%90%'; select * from hbase_pushdown where key=90 and value like '%90%'; +-- with union + +explain +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u; + +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u; + set hive.optimize.index.filter=true; -- with partial pushdown with optimization (HIVE-6650) explain select * from hbase_pushdown where key=90 and value like '%90%'; diff --git hbase-handler/src/test/results/positive/hbase_pushdown.q.out hbase-handler/src/test/results/positive/hbase_pushdown.q.out index 7bb1f5e..9d3b1df 100644 --- hbase-handler/src/test/results/positive/hbase_pushdown.q.out +++ hbase-handler/src/test/results/positive/hbase_pushdown.q.out @@ -119,6 +119,107 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@hbase_pushdown #### A masked pattern was here #### 90 val_90 +PREHOOK: query: -- with union + +explain +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +PREHOOK: type: QUERY +POSTHOOK: query: -- with union + +explain +select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_pushdown + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((key < 10) and key is not null) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Union + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TableScan + alias: hbase_pushdown + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((key > 999) and key is not null) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Union + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_pushdown +#### A masked pattern was here #### +POSTHOOK: query: select * from ( +select * from hbase_pushdown where key<10 and key is not null +union all +select * from hbase_pushdown where key>999 and key is not null +) u +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_pushdown +#### A masked pattern was here #### +0 val_0 +2 val_2 +4 val_4 +5 val_5 +8 val_8 +9 val_9 PREHOOK: query: -- with partial pushdown with optimization (HIVE-6650) explain select * from hbase_pushdown where key=90 and value like '%90%' PREHOOK: type: QUERY diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 703c9d1..f890c9d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -275,6 +275,15 @@ protected void optimizeTaskPlan(List> rootTasks, } @Override + protected List extractMapWorks(List> rootTasks) { + List mapWorks = new ArrayList(); + for (ExecDriver task : Utilities.getMRTasks(rootTasks)) { + mapWorks.add(task.getWork().getMapWork()); + } + return mapWorks; + } + + @Override protected void generateTaskTree(List> rootTasks, ParseContext pCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index a8d9a15..7da4732 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,9 +20,11 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -35,7 +37,12 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -49,10 +56,15 @@ import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -219,6 +231,8 @@ public void compile(final ParseContext pCtx, final List inputs protected abstract void optimizeTaskPlan(List> rootTasks, ParseContext pCtx, Context ctx) throws SemanticException; + protected abstract List extractMapWorks(List> rootTasks); + + protected void finalizeTaskPlan(List> rootTasks, + ParseContext pCtx, Context ctx) throws SemanticException { + for (MapWork mapWork : extractMapWorks(rootTasks)) { + + // for single sourced multiple aliases, multiple filter expr in TS + // cannot be evaluated (see HiveInputFormat#getSplits) + // todo : 'or'ed predicate can be useful for some storage handlers (no for hbase, currently) + Map> aliasToWork = mapWork.getAliasToWork(); + for (ArrayList aliases : mapWork.getPathToAliases().values()) { + if (aliases.size() <= 1) { + continue; + } + List tss = new ArrayList(); + for (String alias : aliases) { + Operator operator = aliasToWork.get(alias); + if (operator instanceof TableScanOperator) { + TableScanOperator ts = (TableScanOperator) operator; + if (ts.getConf().getFilterExpr() != null) { + tss.add(ts); + } + } + } + // recreate filter for each TS + for (TableScanOperator ts : tss) { + ExprNodeGenericFuncDesc predicate = ts.getConf().getFilterExpr(); + FilterOperator child = OperatorUtils.findSingleOperator(ts, FilterOperator.class); + if (child != null) { + FilterDesc filterDesc = child.getConf(); + filterDesc.setPredicate( + ExprNodeDescUtils.mergePredicatesWithDedup(filterDesc.getPredicate(), predicate)); + } else { + FilterDesc filterDesc = new FilterDesc(predicate, false); + Operator[] children = new Operator[ts.getNumChild()]; + for (int i = 0; i < children.length; i++) { + children[i] = ts.getChildOperators().get(i); + children[i].getParentOperators().clear(); + } + Operator newFilter = OperatorFactory.get(filterDesc, children); + List> parents = + new ArrayList>(Arrays.asList(ts)); + newFilter.setParentOperators(parents); + ts.getChildOperators().clear(); + ts.getChildOperators().add(newFilter); + } + ts.getConf().setFilterExpr(null); + } + } + } + } + /* * Called to set the appropriate input format for tasks */ protected abstract void setInputFormat(Task rootTask); /* - * Called to generate the taks tree from the parse context/operator tree + * Called to generate the task tree from the parse context/operator tree */ protected abstract void generateTaskTree(List> rootTasks, ParseContext pCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 10c8d51..02248b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -273,4 +274,17 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } return; } + + @Override + protected List extractMapWorks(List> rootTasks) { + List mapWorks = new ArrayList(); + for (TezTask task : Utilities.getTezTasks(rootTasks)) { + for (BaseWork work : task.getWork().getRoots()) { + if (work instanceof MapWork) { + mapWorks.add((MapWork)work); + } + } + } + return mapWorks; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index f293c43..db88448 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -100,6 +100,17 @@ public static boolean containsPredicate(ExprNodeDesc source, ExprNodeDesc predic } /** + * 'and' all predicates with deduplication. (Do not use on non-deterministic exprs) + */ + public static ExprNodeDesc mergePredicatesWithDedup(ExprNodeDesc... predicates) { + List split = new ArrayList(); + for (ExprNodeDesc expr : predicates) { + split(expr, split); + } + return mergePredicates(split); + } + + /** * bind two predicates by AND op */ public static ExprNodeGenericFuncDesc mergePredicates(ExprNodeDesc prev, ExprNodeDesc next) { @@ -135,17 +146,17 @@ public static ExprNodeDesc mergePredicates(List exprs) { /** * split predicates by AND op */ - public static List split(ExprNodeDesc current, List splitted) { + public static List split(ExprNodeDesc current, List split) { if (FunctionRegistry.isOpAnd(current)) { for (ExprNodeDesc child : current.getChildren()) { - split(child, splitted); + split(child, split); } - return splitted; + return split; } - if (indexOf(current, splitted) < 0) { - splitted.add(current); + if (indexOf(current, split) < 0) { + split.add(current); } - return splitted; + return split; } /** diff --git ql/src/test/results/clientpositive/index_auto_self_join.q.out ql/src/test/results/clientpositive/index_auto_self_join.q.out index 7bd782c..a47f67c 100644 --- ql/src/test/results/clientpositive/index_auto_self_join.q.out +++ ql/src/test/results/clientpositive/index_auto_self_join.q.out @@ -165,7 +165,6 @@ STAGE PLANS: Map Operator Tree: TableScan alias: b - filterExpr: ((value is not null and (key > 70)) and (key < 90)) (type: boolean) Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((value is not null and (key > 70)) and (key < 90)) (type: boolean) @@ -178,7 +177,6 @@ STAGE PLANS: value expressions: key (type: string) TableScan alias: a - filterExpr: ((value is not null and (key > 80)) and (key < 100)) (type: boolean) Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((value is not null and (key > 80)) and (key < 100)) (type: boolean)