diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/EdgeOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/EdgeOptimizer.java new file mode 100644 index 0000000..fa07b41 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/EdgeOptimizer.java @@ -0,0 +1,44 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.Stack; + +/** + * Created by ekoifman on 12/21/16. + */ +public class EdgeOptimizer implements PhysicalPlanResolver { + /** + * All physical plan resolvers have to implement this entry method. + * + * @param pctx + * @return the physical plan + */ + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + return null; + } + + private static final class MyDispatcher implements Dispatcher { + /** + * Dispatcher function. + * + * @param nd operator to process. + * @param stack operator stack to process. + * @param nodeOutputs The argument list of outputs from processing other nodes that are + * passed to this dispatcher from the walker. + * @return Object The return object from the processing call. + * @throws SemanticException + */ + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { + if(!(nd instanceof TezTask)) { + return null; + } + return null; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index cdb9e1b..810b58e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -514,6 +514,13 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping llap decider"); } + for(Task t : rootTasks) { + if(!(t instanceof TezTask)) continue; + TezWork tezWork = ((TezTask)t).getWork(); + if(tezWork == null) continue; + tezWork.optimzeEdges(); + } + // This optimizer will serialize all filters that made it to the // table scan operator to avoid having to do it multiple times on diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 7a70e6b..39fd7e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -81,6 +81,23 @@ public static boolean isCustomInputType(VertexType vertex) { new HashMap, TezEdgeProperty>(); private final Map workVertexTypeMap = new HashMap(); + public void optimzeEdges() { + for(Map.Entry, TezEdgeProperty> ent : edgeProperties.entrySet()) { + if(ent.getKey().getRight() instanceof ReduceWork) { + ReduceWork reduceWork = (ReduceWork)ent.getKey().getRight(); + if(!reduceWork.getSortCols().isEmpty()) { + continue; + } + TezEdgeProperty e = ent.getValue(); + if(e.getEdgeType() == EdgeType.SIMPLE_EDGE) { + TezEdgeProperty newEdge = new TezEdgeProperty(e.getHiveConf(), EdgeType.CUSTOM_SIMPLE_EDGE, + e.isAutoReduce(), e.getMinReducer(), e.getMaxReducer(), e.getInputSizePerReducer()); + ent.setValue(newEdge); + } + } + } + } + public TezWork(String queryId) { this(queryId, null); }