diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java new file mode 100644 index 0000000..f3c1d42 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java @@ -0,0 +1,178 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.Stack; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezWork; + +/** + * SerializeFilter is a simple physical optimizer that serializes all filter expressions in + * Tablescan Operators. + */ +public class SerializeFilter implements PhysicalPlanResolver { + + protected static transient final Log LOG = LogFactory.getLog(SerializeFilter.class); + + public class Serializer implements Dispatcher { + + private final PhysicalContext pctx; + + public Serializer(PhysicalContext pctx) { + this.pctx = pctx; + } + + @SuppressWarnings("unchecked") + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof StatsTask) { + currTask = ((StatsTask) currTask).getWork().getSourceTask(); + } + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(w); + } + } + return null; + } + + private void evaluateWork(BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork((MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork((ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork((MergeJoinWork) w); + } else { + LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName()); + } + } + + private void evaluateMergeWork(MergeJoinWork w) throws SemanticException { + for (BaseWork baseWork : w.getBaseWorkList()) { + evaluateOperators(baseWork, pctx); + } + } + + private void evaluateReduceWork(ReduceWork w) throws SemanticException { + evaluateOperators(w, pctx); + } + + private void evaluateMapWork(MapWork w) throws SemanticException { + evaluateOperators(w, pctx); + } + + private void evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + + Dispatcher disp = null; + final Set tableScans = new LinkedHashSet(); + + Map rules = new HashMap(); + rules.put(new RuleRegExp("TS finder", + TableScanOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) { + tableScans.add((TableScanOperator) nd); + return null; + } + }); + disp = new DefaultRuleDispatcher(null, rules, null); + + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + for (TableScanOperator ts: tableScans) { + if (ts.getConf() != null && ts.getConf().getFilterExpr() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Serializing: " + ts.getConf().getFilterExpr().getExprString()); + } + ts.getConf().setSerializedFilterExpr( + Utilities.serializeExpression(ts.getConf().getFilterExpr())); + } + + if (ts.getConf() != null && ts.getConf().getFilterObject() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Serializing: " + ts.getConf().getFilterObject()); + } + + ts.getConf().setSerializedFilterObject( + Utilities.serializeObject(ts.getConf().getFilterObject())); + } + } + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new Serializer(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 8ab7cd4..f20393a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; @@ -481,6 +482,13 @@ protected void optimizeTaskPlan(List> rootTasks, Pa && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } + + // This optimizer will serialize all filters that made it to the + // table scan operator to avoid having to do it multiple times on + // the backend. If you have a physical optimization that changes + // table scans or filters, you have to invoke it before this one. + physicalCtx = new SerializeFilter().resolve(physicalCtx); + return; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 9e9a2a2..6b6ed53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -147,11 +147,7 @@ public ExprNodeGenericFuncDesc getFilterExpr() { } public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) { - // TODO: we could avoid serialization if it's the same expr. Check? this.filterExpr = filterExpr; - if (filterExpr != null) { - serializedFilterExpr = Utilities.serializeExpression(filterExpr); - } } public Serializable getFilterObject() { @@ -160,9 +156,6 @@ public Serializable getFilterObject() { public void setFilterObject(Serializable filterObject) { this.filterObject = filterObject; - if (filterObject != null) { - serializedFilterObject = Utilities.serializeObject(filterObject); - } } public void setNeededColumnIDs(List neededColumnIDs) { @@ -296,7 +289,15 @@ public String getSerializedFilterExpr() { return serializedFilterExpr; } + public void setSerializedFilterExpr(String serializedFilterExpr) { + this.serializedFilterExpr = serializedFilterExpr; + } + public String getSerializedFilterObject() { return serializedFilterObject; } + + public void setSerializedFilterObject(String serializedFilterObject) { + this.serializedFilterObject = serializedFilterObject; + } }