diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 62374ad..1d51a13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -62,11 +63,13 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFDayOfMonth; import org.apache.hadoop.hive.ql.udf.UDFHour; @@ -196,16 +199,26 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { Task currTask = (Task) nd; if (currTask instanceof MapRedTask) { - boolean ret = validateMRTask((MapRedTask) currTask); - if (ret) { - vectorizeMRTask((MapRedTask) currTask); + convertMapWork(((MapRedTask) currTask).getWork().getMapWork()); + } else if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w: work.getAllWork()) { + if (w instanceof MapWork) { + convertMapWork((MapWork)w); + } } } return null; } - private boolean validateMRTask(MapRedTask mrTask) throws SemanticException { - MapWork mapWork = mrTask.getWork().getMapWork(); + private void convertMapWork(MapWork mapWork) throws SemanticException { + boolean ret = validateMapWork(mapWork); + if (ret) { + vectorizeMapWork(mapWork); + } + } + + private boolean validateMapWork(MapWork mapWork) throws SemanticException { // Validate the input format for (String path : mapWork.getPathToPartitionInfo().keySet()) { @@ -243,12 +256,11 @@ private boolean validateMRTask(MapRedTask mrTask) throws SemanticException { return true; } - private void vectorizeMRTask(MapRedTask mrTask) throws SemanticException { + private void vectorizeMapWork(MapWork mapWork) throws SemanticException { System.err.println("Going down the vectorized path"); - MapWork mapWork = mrTask.getWork().getMapWork(); mapWork.setVectorMode(true); Map opRules = new LinkedHashMap(); - VectorizationNodeProcessor vnp = new VectorizationNodeProcessor(mrTask); + VectorizationNodeProcessor vnp = new VectorizationNodeProcessor(mapWork); opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*" + ReduceSinkOperator.getOperatorName()), vnp); opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*" @@ -298,8 +310,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private final Set> opsDone = new HashSet>(); - public VectorizationNodeProcessor(MapRedTask mrTask) { - this.mWork = mrTask.getWork().getMapWork(); + public VectorizationNodeProcessor(MapWork mWork) { + this.mWork = mWork; } public Map> getScratchColumnVectorTypes() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 42939b4..1632988 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -50,6 +51,8 @@ import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; +import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -238,7 +241,11 @@ protected void decideExecMode(List> rootTasks, Cont @Override protected void optimizeTaskPlan(List> rootTasks, ParseContext pCtx, Context ctx) throws SemanticException { - // no additional optimization needed + PhysicalContext physicalCtx = new PhysicalContext(conf, pCtx, pCtx.getContext(), rootTasks, + pCtx.getFetchTask()); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { + (new Vectorizer()).resolve(physicalCtx); + } return; } }