From d66c58e0b2312594b9655140e8b723c5a3b72035 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Fri, 4 Dec 2015 09:28:37 -0800 Subject: [PATCH] HIVE-12595 : [REFACTOR] Make physical compiler more type safe --- .../hadoop/hive/ql/optimizer/GenMRFileSink1.java | 6 ++-- .../hadoop/hive/ql/optimizer/GenMRProcContext.java | 7 +++-- .../hadoop/hive/ql/optimizer/GenMRTableScan1.java | 7 ++--- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 34 ++++++++++------------ .../hadoop/hive/ql/optimizer/MapJoinFactory.java | 10 +++---- .../optimizer/spark/SparkSortMergeJoinFactory.java | 6 ---- .../apache/hadoop/hive/ql/parse/GenTezUtils.java | 5 ++-- .../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 4 +-- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 2 +- .../hadoop/hive/ql/parse/TestGenTezWork.java | 19 +++++------- 10 files changed, 43 insertions(+), 57 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index dcdc9ba..a231543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.lib.Node; @@ -61,6 +62,7 @@ public GenMRFileSink1() { * @param opProcCtx * context */ + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException { GenMRProcContext ctx = (GenMRProcContext) opProcCtx; @@ -140,7 +142,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, private void processLinkedFileDesc(GenMRProcContext ctx, Task childTask) throws SemanticException { Task currTask = ctx.getCurrTask(); - Operator currTopOp = ctx.getCurrTopOp(); + TableScanOperator currTopOp = ctx.getCurrTopOp(); if (currTopOp != null && !ctx.isSeenOp(currTask, currTopOp)) { String currAliasId = ctx.getCurrAliasId(); GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx); @@ -186,7 +188,7 @@ private Path processFS(FileSinkOperator fsOp, Stack stack, dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(), ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert()); - Operator currTopOp = ctx.getCurrTopOp(); + TableScanOperator currTopOp = ctx.getCurrTopOp(); String currAliasId = ctx.getCurrAliasId(); HashMap, Task> opTaskMap = ctx.getOpTaskMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java index 0da5790..c08469b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -152,7 +153,7 @@ public void addListTopOperators(Operator topOperator) { private LinkedHashMap, GenMapRedCtx> mapCurrCtx; private Task currTask; - private Operator currTopOp; + private TableScanOperator currTopOp; private UnionOperator currUnionOp; private String currAliasId; private DependencyCollectionTask dependencyTaskForMultiInsert; @@ -355,7 +356,7 @@ public void setCurrTask(Task currTask) { /** * @return current top operator */ - public Operator getCurrTopOp() { + public TableScanOperator getCurrTopOp() { return currTopOp; } @@ -363,7 +364,7 @@ public void setCurrTask(Task currTask) { * @param currTopOp * current top operator */ - public void setCurrTopOp(Operator currTopOp) { + public void setCurrTopOp(TableScanOperator currTopOp) { this.currTopOp = currTopOp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index af0ac90..2160e01 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -77,9 +77,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, // create a dummy MapReduce task MapredWork currWork = GenMapRedUtils.getMapRedWork(parseCtx); MapRedTask currTask = (MapRedTask) TaskFactory.get(currWork, parseCtx.getConf()); - Operator currTopOp = op; ctx.setCurrTask(currTask); - ctx.setCurrTopOp(currTopOp); + ctx.setCurrTopOp(op); for (String alias : parseCtx.getTopOps().keySet()) { Operator currOp = parseCtx.getTopOps().get(alias); @@ -160,9 +159,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Table source = op.getConf().getTableMetadata(); List partCols = GenMapRedUtils.getPartitionColumns(op); PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, partCols, false); - GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList); + GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx, partList); } else { // non-partitioned table - GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx); + GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 0cd7b62..5cbd19e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -151,7 +151,7 @@ public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) MapredWork plan = (MapredWork) currTask.getWork(); HashMap, Task> opTaskMap = opProcCtx.getOpTaskMap(); - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); opTaskMap.put(reducer, currTask); plan.setReduceWork(new ReduceWork()); @@ -216,7 +216,7 @@ public static void initUnionPlan(ReduceSinkOperator op, UnionOperator currUnionO private static void setUnionPlan(GenMRProcContext opProcCtx, boolean local, Task currTask, GenMRUnionCtx uCtx, boolean mergeTask) throws SemanticException { - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); if (currTopOp != null) { String currAliasId = opProcCtx.getCurrAliasId(); @@ -332,7 +332,7 @@ public static void joinPlan(Task currTask, throws SemanticException { assert currTask != null && oldTask != null; - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); List> parTasks = null; // terminate the old task and make current task dependent on it if (currTask.getParentTasks() != null @@ -368,7 +368,7 @@ public static void joinPlan(Task currTask, /** * If currTopOp is not set for input of the task, add input for to the task */ - static boolean mergeInput(Operator currTopOp, + static boolean mergeInput(TableScanOperator currTopOp, GenMRProcContext opProcCtx, Task task, boolean local) throws SemanticException { if (!opProcCtx.isSeenOp(task, currTopOp)) { @@ -437,7 +437,7 @@ static void splitPlan(ReduceSinkOperator cRS, GenMRProcContext opProcCtx) * processing context */ public static void setTaskPlan(String alias_id, - Operator topOp, Task task, boolean local, + TableScanOperator topOp, Task task, boolean local, GenMRProcContext opProcCtx) throws SemanticException { setTaskPlan(alias_id, topOp, task, local, opProcCtx, null); } @@ -459,7 +459,7 @@ public static void setTaskPlan(String alias_id, * pruned partition list. If it is null it will be computed on-the-fly. */ public static void setTaskPlan(String alias_id, - Operator topOp, Task task, boolean local, + TableScanOperator topOp, Task task, boolean local, GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException { setMapWork(((MapredWork) task.getWork()).getMapWork(), opProcCtx.getParseCtx(), opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), local); @@ -485,7 +485,7 @@ public static void setTaskPlan(String alias_id, * current instance of hive conf */ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set inputs, - PrunedPartitionList partsList, Operator topOp, String alias_id, + PrunedPartitionList partsList, TableScanOperator tsOp, String alias_id, HiveConf conf, boolean local) throws SemanticException { ArrayList partDir = new ArrayList(); ArrayList partDesc = new ArrayList(); @@ -496,9 +496,8 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set props = topOp.getConf().getOpProps(); + Map props = tsOp.getConf().getOpProps(); if (props != null) { Properties target = aliasPartnDesc.getProperties(); if (target == null) { @@ -590,10 +589,10 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set partToPruner = parseCtx.getOpToPartToSkewedPruner().get(topOp); + Map partToPruner = parseCtx.getOpToPartToSkewedPruner().get(tsOp); ExprNodeDesc listBucketingPruner = (partToPruner != null) ? partToPruner.get(part.getName()) : null; @@ -701,10 +700,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set iterPath = partDir.iterator(); Iterator iterPartnDesc = partDesc.iterator(); @@ -728,7 +724,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); String currAliasId = opProcCtx.getCurrAliasId(); GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx); } @@ -170,11 +171,10 @@ private static void initMapJoinPlan(AbstractMapJoinOperator op, - Task oldTask, + private static void joinMapJoinPlan(Task oldTask, GenMRProcContext opProcCtx, boolean local) throws SemanticException { - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); GenMapRedUtils.mergeInput(currTopOp, opProcCtx, oldTask, local); } @@ -220,7 +220,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } else { // The current plan can be thrown away after being merged with the // original plan - joinMapJoinPlan(mapJoin, oldTask, ctx, local); + joinMapJoinPlan(oldTask, ctx, local); ctx.setCurrTask(currTask = oldTask); } MapredWork plan = (MapredWork) currTask.getWork(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java index aca0630..55fdedb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java @@ -19,21 +19,15 @@ import java.util.List; import java.util.Map; -import java.util.Stack; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * Operator factory for Spark SMBJoin processing. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 27d7276..74f249a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -178,7 +177,7 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, String alias = ts.getConf().getAlias(); - setupMapWork(mapWork, context, partitions, root, alias); + setupMapWork(mapWork, context, partitions, ts, alias); if (ts.getConf().getTableMetadata() != null && ts.getConf().getTableMetadata().isDummyTable()) { mapWork.setDummyTableScan(true); @@ -196,7 +195,7 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, // this method's main use is to help unit testing this class protected void setupMapWork(MapWork mapWork, GenTezProcContext context, - PrunedPartitionList partitions, Operator root, + PrunedPartitionList partitions, TableScanOperator root, String alias) throws SemanticException { // All the setup is done in GenMapRedUtils GenMapRedUtils.setMapWork(mapWork, context.parseContext, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 40c23a5..f795366 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -160,7 +160,7 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, String alias = ((TableScanOperator) root).getConf().getAlias(); if (!deferSetup) { - setupMapWork(mapWork, context, partitions, root, alias); + setupMapWork(mapWork, context, partitions,(TableScanOperator) root, alias); } // add new item to the Spark work @@ -171,7 +171,7 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, // this method's main use is to help unit testing this class protected void setupMapWork(MapWork mapWork, GenSparkProcContext context, - PrunedPartitionList partitions, Operator root, + PrunedPartitionList partitions, TableScanOperator root, String alias) throws SemanticException { // All the setup is done in GenMapRedUtils GenMapRedUtils.setMapWork(mapWork, context.parseContext, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 73e8f6d..edafdb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -372,7 +372,7 @@ public void replaceRoots(Map, Operator> replacementMap) { @Override @Explain(displayName = "Map Operator Tree", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Set> getAllRootOperators() { + public Set> getAllRootOperators() { Set> opSet = new LinkedHashSet>(); for (Operator op : getAliasToWork().values()) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java index d9ab9c0..9e5db23 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java @@ -26,8 +26,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -35,12 +33,9 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -71,17 +66,17 @@ public void setUp() throws Exception { ctx = new GenTezProcContext( new HiveConf(), new ParseContext(), - (List>)Collections.EMPTY_LIST, - (List>) new ArrayList>(), - (Set)Collections.EMPTY_SET, - (Set)Collections.EMPTY_SET); + Collections.EMPTY_LIST, + new ArrayList>(), + Collections.EMPTY_SET, + Collections.EMPTY_SET); proc = new GenTezWork(new GenTezUtils() { @Override - protected void setupMapWork(MapWork mapWork, GenTezProcContext context, - PrunedPartitionList partitions, Operator root, String alias) + protected void setupMapWork(MapWork mapWork, GenTezProcContext context, + PrunedPartitionList partitions, TableScanOperator root, String alias) throws SemanticException { - + LinkedHashMap> map = new LinkedHashMap>(); map.put("foo", root); -- 1.7.12.4 (Apple Git-37)