From 882f5d66e771614214ab34c9ea11012dcbe34cae Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Fri, 4 Dec 2015 09:28:37 -0800 Subject: [PATCH] HIVE-12595 : [REFACTOR] Make physical compiler more type safe --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 155 +++++---------------- .../org/apache/hadoop/hive/ql/exec/Operator.java | 2 +- .../hive/ql/optimizer/AbstractBucketJoinProc.java | 4 +- .../hive/ql/optimizer/AbstractSMBJoinProc.java | 6 +- .../hadoop/hive/ql/optimizer/GenMRFileSink1.java | 6 +- .../hadoop/hive/ql/optimizer/GenMRProcContext.java | 20 ++- .../hadoop/hive/ql/optimizer/GenMRTableScan1.java | 7 +- .../hadoop/hive/ql/optimizer/GenMRUnion1.java | 5 +- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 40 +++--- .../hive/ql/optimizer/GlobalLimitOptimizer.java | 8 +- .../hadoop/hive/ql/optimizer/MapJoinFactory.java | 10 +- .../hive/ql/optimizer/SimpleFetchOptimizer.java | 30 ++-- .../hive/ql/optimizer/SkewJoinOptimizer.java | 6 +- .../calcite/translator/HiveOpConverter.java | 14 +- .../translator/HiveOpConverterPostProc.java | 6 +- .../ql/optimizer/index/RewriteGBUsingIndex.java | 22 ++- .../index/RewriteQueryUsingAggregateIndexCtx.java | 13 +- .../hive/ql/optimizer/lineage/ExprProcFactory.java | 35 +++-- .../optimizer/spark/SparkSortMergeJoinFactory.java | 6 - .../hadoop/hive/ql/parse/ColumnAccessAnalyzer.java | 36 ++--- .../apache/hadoop/hive/ql/parse/GenTezUtils.java | 5 +- .../apache/hadoop/hive/ql/parse/ParseContext.java | 9 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 ++- .../hive/ql/parse/spark/GenSparkProcContext.java | 5 +- .../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 4 +- .../hadoop/hive/ql/parse/spark/SparkCompiler.java | 3 - .../org/apache/hadoop/hive/ql/plan/MapWork.java | 2 +- .../org/apache/hadoop/hive/ql/plan/PlanUtils.java | 9 +- .../hadoop/hive/ql/parse/TestGenTezWork.java | 19 +-- 29 files changed, 180 insertions(+), 329 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d81e17a..f6af6ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -70,15 +69,10 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.AuthorizationException; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -100,10 +94,8 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -754,48 +746,44 @@ private static void getTablePartitionUsedColumns(HiveOperation op, BaseSemanticA SemanticAnalyzer querySem = (SemanticAnalyzer) sem; ParseContext parseCtx = querySem.getParseContext(); - for (Map.Entry> topOpMap : querySem - .getParseContext().getTopOps().entrySet()) { - Operator topOp = topOpMap.getValue(); - if (topOp instanceof TableScanOperator) { - TableScanOperator tableScanOp = (TableScanOperator) topOp; - Table tbl = tableScanOp.getConf().getTableMetadata(); - List neededColumnIds = tableScanOp.getNeededColumnIDs(); - List columns = tbl.getCols(); - List cols = new ArrayList(); - for (int i = 0; i < neededColumnIds.size(); i++) { - cols.add(columns.get(neededColumnIds.get(i)).getName()); - } - //map may not contain all sources, since input list may have been optimized out - //or non-existent tho such sources may still be referenced by the TableScanOperator - //if it's null then the partition probably doesn't exist so let's use table permission - if (tbl.isPartitioned() && - Boolean.TRUE.equals(tableUsePartLevelAuth.get(tbl.getTableName()))) { - String alias_id = topOpMap.getKey(); - - PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp, - parseCtx, alias_id); - Set parts = partsList.getPartitions(); - for (Partition part : parts) { - List existingCols = part2Cols.get(part); - if (existingCols == null) { - existingCols = new ArrayList(); - } - existingCols.addAll(cols); - part2Cols.put(part, existingCols); - } - } else { - List existingCols = tab2Cols.get(tbl); + for (Map.Entry topOpMap : querySem.getParseContext().getTopOps().entrySet()) { + TableScanOperator topOp = topOpMap.getValue(); + TableScanOperator tableScanOp = topOp; + Table tbl = tableScanOp.getConf().getTableMetadata(); + List neededColumnIds = tableScanOp.getNeededColumnIDs(); + List columns = tbl.getCols(); + List cols = new ArrayList(); + for (int i = 0; i < neededColumnIds.size(); i++) { + cols.add(columns.get(neededColumnIds.get(i)).getName()); + } + //map may not contain all sources, since input list may have been optimized out + //or non-existent tho such sources may still be referenced by the TableScanOperator + //if it's null then the partition probably doesn't exist so let's use table permission + if (tbl.isPartitioned() && + Boolean.TRUE.equals(tableUsePartLevelAuth.get(tbl.getTableName()))) { + String alias_id = topOpMap.getKey(); + + PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp, + parseCtx, alias_id); + Set parts = partsList.getPartitions(); + for (Partition part : parts) { + List existingCols = part2Cols.get(part); if (existingCols == null) { existingCols = new ArrayList(); } existingCols.addAll(cols); - tab2Cols.put(tbl, existingCols); + part2Cols.put(part, existingCols); } + } else { + List existingCols = tab2Cols.get(tbl); + if (existingCols == null) { + existingCols = new ArrayList(); + } + existingCols.addAll(cols); + tab2Cols.put(tbl, existingCols); } } } - } private static void doAuthorizationV2(SessionState ss, HiveOperation op, HashSet inputs, @@ -893,85 +881,6 @@ public QueryPlan getPlan() { return plan; } - /** - * @param d - * The database to be locked - * @param t - * The table to be locked - * @param p - * The partition to be locked - * @param mode - * The mode of the lock (SHARED/EXCLUSIVE) Get the list of objects to be locked. If a - * partition needs to be locked (in any mode), all its parents should also be locked in - * SHARED mode. - */ - private List getLockObjects(Database d, Table t, Partition p, HiveLockMode mode) - throws SemanticException { - List locks = new LinkedList(); - - HiveLockObjectData lockData = - new HiveLockObjectData(plan.getQueryId(), - String.valueOf(System.currentTimeMillis()), - "IMPLICIT", - plan.getQueryStr()); - if (d != null) { - locks.add(new HiveLockObj(new HiveLockObject(d.getName(), lockData), mode)); - return locks; - } - - if (t != null) { - locks.add(new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode)); - locks.add(new HiveLockObj(new HiveLockObject(t, lockData), mode)); - mode = HiveLockMode.SHARED; - locks.add(new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode)); - return locks; - } - - if (p != null) { - locks.add(new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode)); - if (!(p instanceof DummyPartition)) { - locks.add(new HiveLockObj(new HiveLockObject(p, lockData), mode)); - } - - // All the parents are locked in shared mode - mode = HiveLockMode.SHARED; - - // For dummy partitions, only partition name is needed - String name = p.getName(); - - if (p instanceof DummyPartition) { - name = p.getName().split("@")[2]; - } - - String partialName = ""; - String[] partns = name.split("/"); - int len = p instanceof DummyPartition ? partns.length : partns.length - 1; - Map partialSpec = new LinkedHashMap(); - for (int idx = 0; idx < len; idx++) { - String partn = partns[idx]; - partialName += partn; - String[] nameValue = partn.split("="); - assert(nameValue.length == 2); - partialSpec.put(nameValue[0], nameValue[1]); - try { - locks.add(new HiveLockObj( - new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName() - + "/" + MetaStoreUtils.encodeTableName(p.getTable().getTableName()) - + "/" + partialName, - partialSpec), lockData), mode)); - partialName += "/"; - } catch (HiveException e) { - throw new SemanticException(e.getMessage()); - } - } - - locks.add(new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode)); - locks.add(new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode)); - } - - return locks; - } - // Write the current set of valid transactions into the conf file so that it can be read by // the input format. private void recordValidTxns() throws LockException { @@ -1304,7 +1213,7 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp } if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) { //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics - //also, indirectly allows DDL to be executed outside a txn context + //also, indirectly allows DDL to be executed outside a txn context startTxnImplicitly = true; } if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) { @@ -1381,7 +1290,7 @@ private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { - LOG.error("rollback() FAILED: " + cpr);//make sure not to loose + LOG.error("rollback() FAILED: " + cpr);//make sure not to loose handleHiveException(e, 12, "Additional info in hive.log at \"rollback() FAILED\""); } return cpr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 9a86a35..85ab6b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -1271,7 +1271,7 @@ public String toString() { return getName() + "[" + getIdentifier() + "]"; } - public static String toString(Collection> top) { + public static String toString(Collection top) { StringBuilder builder = new StringBuilder(); Set visited = new HashSet(); for (Operator op : top) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java index 7cf0357..a0bc19f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java @@ -194,7 +194,7 @@ protected boolean checkConvertBucketMapJoin( LinkedHashMap>> tblAliasToBucketedFilePathsInEachPartition = new LinkedHashMap>>(); - HashMap> topOps = pGraphContext.getTopOps(); + HashMap topOps = pGraphContext.getTopOps(); HashMap aliasToNewAliasMap = new HashMap(); @@ -228,7 +228,7 @@ protected boolean checkConvertBucketMapJoin( // For nested sub-queries, the alias mapping is not maintained in QB currently. if (topOps.containsValue(tso)) { - for (Map.Entry> topOpEntry : topOps.entrySet()) { + for (Map.Entry topOpEntry : topOps.entrySet()) { if (topOpEntry.getValue() == tso) { String newAlias = topOpEntry.getKey(); if (!newAlias.equals(alias)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index 9509f8e..1da0dda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -291,7 +291,7 @@ private boolean isEligibleForBucketSortMergeJoin( * The table alias should be subq2:subq1:a which needs to be fetched from topOps. */ if (pGraphContext.getTopOps().containsValue(tso)) { - for (Map.Entry> topOpEntry : + for (Map.Entry topOpEntry : this.pGraphContext.getTopOps().entrySet()) { if (topOpEntry.getValue() == tso) { alias = topOpEntry.getKey(); @@ -444,13 +444,13 @@ protected boolean canConvertJoinToBucketMapJoin( String selector = HiveConf.getVar(pGraphContext.getConf(), HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR); bigTableMatcherClass = - (Class) JavaUtils.loadClass(selector); + JavaUtils.loadClass(selector); } catch (ClassNotFoundException e) { throw new SemanticException(e.getMessage()); } BigTableSelectorForAutoSMJ bigTableMatcher = - (BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null); + ReflectionUtils.newInstance(bigTableMatcherClass, null); JoinDesc joinDesc = joinOp.getConf(); JoinCondDesc[] joinCondns = joinDesc.getConds(); Set joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index dcdc9ba..a231543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.lib.Node; @@ -61,6 +62,7 @@ public GenMRFileSink1() { * @param opProcCtx * context */ + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException { GenMRProcContext ctx = (GenMRProcContext) opProcCtx; @@ -140,7 +142,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, private void processLinkedFileDesc(GenMRProcContext ctx, Task childTask) throws SemanticException { Task currTask = ctx.getCurrTask(); - Operator currTopOp = ctx.getCurrTopOp(); + TableScanOperator currTopOp = ctx.getCurrTopOp(); if (currTopOp != null && !ctx.isSeenOp(currTask, currTopOp)) { String currAliasId = ctx.getCurrAliasId(); GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx); @@ -186,7 +188,7 @@ private Path processFS(FileSinkOperator fsOp, Stack stack, dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(), ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert()); - Operator currTopOp = ctx.getCurrTopOp(); + TableScanOperator currTopOp = ctx.getCurrTopOp(); String currAliasId = ctx.getCurrAliasId(); HashMap, Task> opTaskMap = ctx.getOpTaskMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java index 0da5790..4387c42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -94,13 +95,13 @@ public String getCurrAliasId() { final Task uTask; List taskTmpDir; List tt_desc; - List> listTopOperators; + List listTopOperators; public GenMRUnionCtx(Task uTask) { this.uTask = uTask; taskTmpDir = new ArrayList(); tt_desc = new ArrayList(); - listTopOperators = new ArrayList>(); + listTopOperators = new ArrayList<>(); } public Task getUTask() { @@ -123,16 +124,11 @@ public void addTTDesc(TableDesc tt_desc) { return tt_desc; } - public List> getListTopOperators() { + public List getListTopOperators() { return listTopOperators; } - public void setListTopOperators( - List> listTopOperators) { - this.listTopOperators = listTopOperators; - } - - public void addListTopOperators(Operator topOperator) { + public void addListTopOperators(TableScanOperator topOperator) { listTopOperators.add(topOperator); } } @@ -152,7 +148,7 @@ public void addListTopOperators(Operator topOperator) { private LinkedHashMap, GenMapRedCtx> mapCurrCtx; private Task currTask; - private Operator currTopOp; + private TableScanOperator currTopOp; private UnionOperator currUnionOp; private String currAliasId; private DependencyCollectionTask dependencyTaskForMultiInsert; @@ -355,7 +351,7 @@ public void setCurrTask(Task currTask) { /** * @return current top operator */ - public Operator getCurrTopOp() { + public TableScanOperator getCurrTopOp() { return currTopOp; } @@ -363,7 +359,7 @@ public void setCurrTask(Task currTask) { * @param currTopOp * current top operator */ - public void setCurrTopOp(Operator currTopOp) { + public void setCurrTopOp(TableScanOperator currTopOp) { this.currTopOp = currTopOp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index af0ac90..2160e01 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -77,9 +77,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, // create a dummy MapReduce task MapredWork currWork = GenMapRedUtils.getMapRedWork(parseCtx); MapRedTask currTask = (MapRedTask) TaskFactory.get(currWork, parseCtx.getConf()); - Operator currTopOp = op; ctx.setCurrTask(currTask); - ctx.setCurrTopOp(currTopOp); + ctx.setCurrTopOp(op); for (String alias : parseCtx.getTopOps().keySet()) { Operator currOp = parseCtx.getTopOps().get(alias); @@ -160,9 +159,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Table source = op.getConf().getTableMetadata(); List partCols = GenMapRedUtils.getPartitionColumns(op); PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, partCols, false); - GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList); + GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx, partList); } else { // non-partitioned table - GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx); + GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java index d3afdc8..5102d19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java @@ -77,7 +77,7 @@ private Object processMapOnlyUnion(UnionOperator union, Stack stack, UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union); ctx.getMapCurrCtx().put( - (Operator) union, + union, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrAliasId())); @@ -170,7 +170,7 @@ private void processSubQueryUnionMerge(GenMRProcContext ctx, // plan Task uTask = uCtxTask.getUTask(); ctx.setCurrTask(uTask); - Operator topOp = ctx.getCurrTopOp(); + TableScanOperator topOp = ctx.getCurrTopOp(); if (topOp != null && !ctx.isSeenOp(uTask, topOp)) { GenMapRedUtils.setTaskPlan(ctx.getCurrAliasId(), ctx .getCurrTopOp(), uTask, false, ctx); @@ -189,6 +189,7 @@ private void processSubQueryUnionMerge(GenMRProcContext ctx, * @param opProcCtx * context */ + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException { UnionOperator union = (UnionOperator) nd; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 0cd7b62..a1c9651 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -151,7 +151,7 @@ public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) MapredWork plan = (MapredWork) currTask.getWork(); HashMap, Task> opTaskMap = opProcCtx.getOpTaskMap(); - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); opTaskMap.put(reducer, currTask); plan.setReduceWork(new ReduceWork()); @@ -216,7 +216,7 @@ public static void initUnionPlan(ReduceSinkOperator op, UnionOperator currUnionO private static void setUnionPlan(GenMRProcContext opProcCtx, boolean local, Task currTask, GenMRUnionCtx uCtx, boolean mergeTask) throws SemanticException { - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); if (currTopOp != null) { String currAliasId = opProcCtx.getCurrAliasId(); @@ -234,7 +234,7 @@ private static void setUnionPlan(GenMRProcContext opProcCtx, int size = taskTmpDirLst.size(); assert local == false; - List> topOperators = + List topOperators = uCtx.getListTopOperators(); MapredWork plan = (MapredWork) currTask.getWork(); @@ -332,7 +332,7 @@ public static void joinPlan(Task currTask, throws SemanticException { assert currTask != null && oldTask != null; - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); List> parTasks = null; // terminate the old task and make current task dependent on it if (currTask.getParentTasks() != null @@ -368,7 +368,7 @@ public static void joinPlan(Task currTask, /** * If currTopOp is not set for input of the task, add input for to the task */ - static boolean mergeInput(Operator currTopOp, + static boolean mergeInput(TableScanOperator currTopOp, GenMRProcContext opProcCtx, Task task, boolean local) throws SemanticException { if (!opProcCtx.isSeenOp(task, currTopOp)) { @@ -437,7 +437,7 @@ static void splitPlan(ReduceSinkOperator cRS, GenMRProcContext opProcCtx) * processing context */ public static void setTaskPlan(String alias_id, - Operator topOp, Task task, boolean local, + TableScanOperator topOp, Task task, boolean local, GenMRProcContext opProcCtx) throws SemanticException { setTaskPlan(alias_id, topOp, task, local, opProcCtx, null); } @@ -459,7 +459,7 @@ public static void setTaskPlan(String alias_id, * pruned partition list. If it is null it will be computed on-the-fly. */ public static void setTaskPlan(String alias_id, - Operator topOp, Task task, boolean local, + TableScanOperator topOp, Task task, boolean local, GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException { setMapWork(((MapredWork) task.getWork()).getMapWork(), opProcCtx.getParseCtx(), opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), local); @@ -485,7 +485,7 @@ public static void setTaskPlan(String alias_id, * current instance of hive conf */ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set inputs, - PrunedPartitionList partsList, Operator topOp, String alias_id, + PrunedPartitionList partsList, TableScanOperator tsOp, String alias_id, HiveConf conf, boolean local) throws SemanticException { ArrayList partDir = new ArrayList(); ArrayList partDesc = new ArrayList(); @@ -496,9 +496,8 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set props = topOp.getConf().getOpProps(); + Map props = tsOp.getConf().getOpProps(); if (props != null) { Properties target = aliasPartnDesc.getProperties(); if (target == null) { @@ -590,10 +589,10 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set partToPruner = parseCtx.getOpToPartToSkewedPruner().get(topOp); + Map partToPruner = parseCtx.getOpToPartToSkewedPruner().get(tsOp); ExprNodeDesc listBucketingPruner = (partToPruner != null) ? partToPruner.get(part.getName()) : null; @@ -701,10 +700,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set iterPath = partDir.iterator(); Iterator iterPartnDesc = partDesc.iterator(); @@ -728,7 +724,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set tsMerge = + TableScanOperator tsMerge = GenMapRedUtils.createTemporaryTableScanOperator(inputRS); // Create a FileSink operator @@ -1539,7 +1535,7 @@ public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) * @return the MapredWork */ private static MapWork createMRWorkForMergingFiles (HiveConf conf, - Operator topOp, FileSinkDesc fsDesc) { + TableScanOperator topOp, FileSinkDesc fsDesc) { ArrayList aliases = new ArrayList(); String inputDir = fsDesc.getFinalDirName().toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java index 6b04d92..30976af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java @@ -39,10 +39,8 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SplitSample; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import com.google.common.collect.ImmutableSet; @@ -65,11 +63,11 @@ private final Logger LOG = LoggerFactory.getLogger(GlobalLimitOptimizer.class.getName()); + @Override public ParseContext transform(ParseContext pctx) throws SemanticException { Context ctx = pctx.getContext(); - Map> topOps = pctx.getTopOps(); + Map topOps = pctx.getTopOps(); GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx(); - Map opToPartPruner = pctx.getOpToPartPruner(); Map nameToSplitSample = pctx.getNameToSplitSample(); // determine the query qualifies reduce input size for LIMIT @@ -92,7 +90,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // FROM ... LIMIT... // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2); // - TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; + TableScanOperator ts = topOps.values().iterator().next(); Integer tempGlobalLimit = checkQbpForGlobalLimit(ts); // query qualify for the optimization diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java index 647f863..867a1f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -152,7 +153,7 @@ private static void initMapJoinPlan(AbstractMapJoinOperator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); String currAliasId = opProcCtx.getCurrAliasId(); GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, local, opProcCtx); } @@ -170,11 +171,10 @@ private static void initMapJoinPlan(AbstractMapJoinOperator op, - Task oldTask, + private static void joinMapJoinPlan(Task oldTask, GenMRProcContext opProcCtx, boolean local) throws SemanticException { - Operator currTopOp = opProcCtx.getCurrTopOp(); + TableScanOperator currTopOp = opProcCtx.getCurrTopOp(); GenMapRedUtils.mergeInput(currTopOp, opProcCtx, oldTask, local); } @@ -220,7 +220,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } else { // The current plan can be thrown away after being merged with the // original plan - joinMapJoinPlan(mapJoin, oldTask, ctx, local); + joinMapJoinPlan(oldTask, ctx, local); ctx.setCurrTask(currTask = oldTask); } MapredWork plan = (MapredWork) currTask.getWork(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index 9b9a5ca..632a622 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -93,28 +93,26 @@ @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - Map> topOps = pctx.getTopOps(); + Map topOps = pctx.getTopOps(); if (pctx.getQueryProperties().isQuery() && !pctx.getQueryProperties().isAnalyzeCommand() && topOps.size() == 1) { // no join, no groupby, no distinct, no lateral view, no subq, // no CTAS or insert, not analyze command, and single sourced. String alias = (String) pctx.getTopOps().keySet().toArray()[0]; - Operator topOp = (Operator) pctx.getTopOps().values().toArray()[0]; - if (topOp instanceof TableScanOperator) { - try { - FetchTask fetchTask = optimize(pctx, alias, (TableScanOperator) topOp); - if (fetchTask != null) { - pctx.setFetchTask(fetchTask); - } - } catch (Exception e) { - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); - if (e instanceof SemanticException) { - throw (SemanticException) e; - } - throw new SemanticException(e.getMessage(), e); + TableScanOperator topOp = pctx.getTopOps().values().iterator().next(); + try { + FetchTask fetchTask = optimize(pctx, alias, topOp); + if (fetchTask != null) { + pctx.setFetchTask(fetchTask); + } + } catch (Exception e) { + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (e instanceof SemanticException) { + throw (SemanticException) e; } + throw new SemanticException(e.getMessage(), e); } } return pctx; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index 64dc48c..81c1939 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -190,7 +190,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Update the topOps appropriately Map> topOps = getTopOps(joinOpClone); - Map> origTopOps = parseContext.getTopOps(); + Map origTopOps = parseContext.getTopOps(); for (Entry> topOp : topOps.entrySet()) { TableScanOperator tso = (TableScanOperator) topOp.getValue(); @@ -283,7 +283,7 @@ private boolean getTableScanOps( * @param op The join operator being optimized * @param tableScanOpsForJoin table scan operators which are parents of the join operator * @return map. - * @throws SemanticException + * @throws SemanticException */ private Map, List>> getSkewedValues( @@ -406,7 +406,7 @@ private TableScanOperator getTableScanOperator( return tsOp; } } - if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) || + if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) || (op.getParentOperators().size() > 1)) { return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 130ee89..00f1acb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -119,12 +119,12 @@ private final SemanticAnalyzer semanticAnalyzer; private final HiveConf hiveConf; private final UnparseTranslator unparseTranslator; - private final Map> topOps; + private final Map topOps; private final boolean strictMode; private int uniqueCounter; public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf, - UnparseTranslator unparseTranslator, Map> topOps, + UnparseTranslator unparseTranslator, Map topOps, boolean strictMode) { this.semanticAnalyzer = semanticAnalyzer; this.hiveConf = hiveConf; @@ -600,7 +600,7 @@ OpAttr visit(HiveSortExchange exchangeRel) throws SemanticException { } ExprNodeDesc[] expressions = new ExprNodeDesc[exchangeRel.getJoinKeys().size()]; for (int index = 0; index < exchangeRel.getJoinKeys().size(); index++) { - expressions[index] = convertToExprNode((RexNode) exchangeRel.getJoinKeys().get(index), + expressions[index] = convertToExprNode(exchangeRel.getJoinKeys().get(index), exchangeRel.getInput(), inputOpAf.tabAlias, inputOpAf); } exchangeRel.setJoinExpressions(expressions); @@ -943,7 +943,7 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio int rightPos = joinCondns[i].getRight(); for (ExprNodeDesc expr : filterExpressions.get(i)) { - // We need to update the exprNode, as currently + // We need to update the exprNode, as currently // they refer to columns in the output of the join; // they should refer to the columns output by the RS int inputPos = updateExprNode(expr, reversedExprs, colExprMap); @@ -956,9 +956,9 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio joinCondns[i].getType() == JoinDesc.LEFT_OUTER_JOIN || joinCondns[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) { if (inputPos == leftPos) { - updateFilterMap(filterMap, leftPos, rightPos); + updateFilterMap(filterMap, leftPos, rightPos); } else { - updateFilterMap(filterMap, rightPos, leftPos); + updateFilterMap(filterMap, rightPos, leftPos); } } } @@ -992,7 +992,7 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio * This method updates the input expr, changing all the * ExprNodeColumnDesc in it to refer to columns given by the * colExprMap. - * + * * For instance, "col_0 = 1" would become "VALUE.col_0 = 1"; * the execution engine expects filters in the Join operators * to be expressed that way. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java index 1d0a254..a63f167 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverterPostProc.java @@ -25,8 +25,6 @@ import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -48,8 +46,6 @@ public class HiveOpConverterPostProc implements Transform { - private static final Logger LOG = LoggerFactory.getLogger(HiveOpConverterPostProc.class); - private ParseContext pctx; private Map> aliasToOpInfo; @@ -139,7 +135,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // 1. Get alias from topOps String opAlias = null; - for (Map.Entry> topOpEntry : pctx.getTopOps().entrySet()) { + for (Map.Entry topOpEntry : pctx.getTopOps().entrySet()) { if (topOpEntry.getValue() == tableScanOp) { opAlias = topOpEntry.getKey(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java index ea1ece6..4277be5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.AggregateIndexHandler; @@ -44,7 +43,6 @@ import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** @@ -160,9 +158,9 @@ boolean shouldApplyOptimization() throws SemanticException { * if the optimization can be applied. If yes, we add the name of the top table to * the tsOpToProcess to apply rewrite later on. * */ - for (Map.Entry> entry : parseContext.getTopOps().entrySet()) { + for (Map.Entry entry : parseContext.getTopOps().entrySet()) { String alias = entry.getKey(); - TableScanOperator topOp = (TableScanOperator) entry.getValue(); + TableScanOperator topOp = entry.getValue(); Table table = topOp.getConf().getTableMetadata(); List indexes = tableToIndex.get(table); if (indexes.isEmpty()) { @@ -230,16 +228,14 @@ private boolean checkIfRewriteCanBeApplied(String alias, TableScanOperator topOp supportedIndexes.add(AggregateIndexHandler.class.getName()); // query the metastore to know what columns we have indexed - Collection> topTables = parseContext.getTopOps().values(); + Collection topTables = parseContext.getTopOps().values(); Map> indexes = new HashMap>(); - for (Operator op : topTables) { - if (op instanceof TableScanOperator) { - TableScanOperator tsOP = (TableScanOperator) op; - List tblIndexes = IndexUtils.getIndexes(tsOP.getConf().getTableMetadata(), - supportedIndexes); - if (tblIndexes.size() > 0) { - indexes.put(tsOP.getConf().getTableMetadata(), tblIndexes); - } + for (TableScanOperator op : topTables) { + TableScanOperator tsOP = op; + List tblIndexes = IndexUtils.getIndexes(tsOP.getConf().getTableMetadata(), + supportedIndexes); + if (tblIndexes.size() > 0) { + indexes.put(tsOP.getConf().getTableMetadata(), tblIndexes); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java index d0f28d8..a8ba4d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.SerDeException; @@ -85,7 +83,7 @@ public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseC private final Hive hiveDb; private final ParseContext parseContext; - private RewriteCanApplyCtx canApplyCtx; + private final RewriteCanApplyCtx canApplyCtx; //We need the GenericUDAFEvaluator for GenericUDAF function "sum" private GenericUDAFEvaluator eval = null; private final String indexTableName; @@ -148,7 +146,7 @@ public void invokeRewriteQueryProc() throws SemanticException { this.replaceSelectOperatorProcess(selectperator); } } - + /** * This method replaces the original TableScanOperator with the new * TableScanOperator and metadata that scans over the index table rather than @@ -161,7 +159,7 @@ private void replaceTableScanProcess(TableScanOperator scanOperator) throws Sema // Need to remove the original TableScanOperators from these data structures // and add new ones - Map> topOps = rewriteQueryCtx.getParseContext() + HashMap topOps = rewriteQueryCtx.getParseContext() .getTopOps(); // remove original TableScanOperator @@ -211,8 +209,7 @@ private void replaceTableScanProcess(TableScanOperator scanOperator) throws Sema scanOperator.getConf().setAlias(newAlias); scanOperator.setAlias(indexTableName); topOps.put(newAlias, scanOperator); - rewriteQueryCtx.getParseContext().setTopOps( - (HashMap>) topOps); + rewriteQueryCtx.getParseContext().setTopOps(topOps); ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rs, Arrays.asList(rewriteQueryCtx.getIndexKey())); @@ -307,7 +304,7 @@ private void replaceGroupByOperatorProcess(GroupByOperator operator, int index) } else { // we just need to reset the GenericUDAFEvaluator and its name for this // GroupByOperator whose parent is the ReduceSinkOperator - GroupByDesc childConf = (GroupByDesc) operator.getConf(); + GroupByDesc childConf = operator.getConf(); List childAggrList = childConf.getAggregators(); if (childAggrList != null && childAggrList.size() > 0) { for (AggregationDesc aggregationDesc : childAggrList) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java index 38040e3..09ef490 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java @@ -171,27 +171,24 @@ public static NodeProcessor getColumnProcessor() { private static boolean findSourceColumn( LineageCtx lctx, Predicate cond, String tabAlias, String alias) { - for (Map.Entry> topOpMap: lctx - .getParseCtx().getTopOps().entrySet()) { - Operator topOp = topOpMap.getValue(); - if (topOp instanceof TableScanOperator) { - TableScanOperator tableScanOp = (TableScanOperator) topOp; - Table tbl = tableScanOp.getConf().getTableMetadata(); - if (tbl.getTableName().equals(tabAlias) - || tabAlias.equals(tableScanOp.getConf().getAlias())) { - for (FieldSchema column: tbl.getCols()) { - if (column.getName().equals(alias)) { - TableAliasInfo table = new TableAliasInfo(); - table.setTable(tbl.getTTable()); - table.setAlias(tabAlias); - BaseColumnInfo colInfo = new BaseColumnInfo(); - colInfo.setColumn(column); - colInfo.setTabAlias(table); - cond.getBaseCols().add(colInfo); - return true; - } + for (Map.Entry topOpMap: lctx.getParseCtx().getTopOps().entrySet()) { + TableScanOperator tableScanOp = topOpMap.getValue(); + Table tbl = tableScanOp.getConf().getTableMetadata(); + if (tbl.getTableName().equals(tabAlias) + || tabAlias.equals(tableScanOp.getConf().getAlias())) { + for (FieldSchema column: tbl.getCols()) { + if (column.getName().equals(alias)) { + TableAliasInfo table = new TableAliasInfo(); + table.setTable(tbl.getTTable()); + table.setAlias(tabAlias); + BaseColumnInfo colInfo = new BaseColumnInfo(); + colInfo.setColumn(column); + colInfo.setTabAlias(table); + cond.getBaseCols().add(colInfo); + return true; } } + } } return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java index aca0630..55fdedb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java @@ -19,21 +19,15 @@ import java.util.List; import java.util.Map; -import java.util.Stack; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * Operator factory for Spark SMBJoin processing. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java index ea58917..dcc8daf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java @@ -18,19 +18,12 @@ package org.apache.hadoop.hive.ql.parse; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; public class ColumnAccessAnalyzer { - private static final Logger LOG = LoggerFactory.getLogger(ColumnAccessAnalyzer.class.getName()); private final ParseContext pGraphContext; public ColumnAccessAnalyzer() { @@ -43,22 +36,19 @@ public ColumnAccessAnalyzer(ParseContext pactx) { public ColumnAccessInfo analyzeColumnAccess() throws SemanticException { ColumnAccessInfo columnAccessInfo = new ColumnAccessInfo(); - Collection> topOps = pGraphContext.getTopOps().values(); - for (Operator op : topOps) { - if (op instanceof TableScanOperator) { - TableScanOperator top = (TableScanOperator) op; - Table table = top.getConf().getTableMetadata(); - String tableName = table.getCompleteName(); - List referenced = top.getReferencedColumns(); - for (String column : referenced) { - columnAccessInfo.add(tableName, column); - } - if (table.isPartitioned()) { - PrunedPartitionList parts = pGraphContext.getPrunedPartitions(table.getTableName(), top); - if (parts.getReferredPartCols() != null) { - for (String partKey : parts.getReferredPartCols()) { - columnAccessInfo.add(tableName, partKey); - } + Collection topOps = pGraphContext.getTopOps().values(); + for (TableScanOperator top : topOps) { + Table table = top.getConf().getTableMetadata(); + String tableName = table.getCompleteName(); + List referenced = top.getReferencedColumns(); + for (String column : referenced) { + columnAccessInfo.add(tableName, column); + } + if (table.isPartitioned()) { + PrunedPartitionList parts = pGraphContext.getPrunedPartitions(table.getTableName(), top); + if (parts.getReferredPartCols() != null) { + for (String partKey : parts.getReferredPartCols()) { + columnAccessInfo.add(tableName, partKey); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index fe0e234..f656998 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -179,7 +178,7 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, String alias = ts.getConf().getAlias(); - setupMapWork(mapWork, context, partitions, root, alias); + setupMapWork(mapWork, context, partitions, ts, alias); if (ts.getConf().getTableMetadata() != null && ts.getConf().getTableMetadata().isDummyTable()) { mapWork.setDummyTableScan(true); @@ -197,7 +196,7 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, // this method's main use is to help unit testing this class protected void setupMapWork(MapWork mapWork, GenTezProcContext context, - PrunedPartitionList partitions, Operator root, + PrunedPartitionList partitions, TableScanOperator root, String alias) throws SemanticException { // All the setup is done in GenMapRedUtils GenMapRedUtils.setMapWork(mapWork, context.parseContext, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 5872e8e..bee0175 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; /** @@ -68,7 +67,7 @@ private HashMap opToPartList; private HashMap opToSamplePruner; private Map> opToPartToSkewedPruner; - private HashMap> topOps; + private HashMap topOps; private Set joinOps; private Set mapJoinOps; private Set smbMapJoinOps; @@ -150,7 +149,7 @@ public ParseContext( HiveConf conf, HashMap opToPartPruner, HashMap opToPartList, - HashMap> topOps, + HashMap topOps, Set joinOps, Set smbMapJoinOps, List loadTableWork, List loadFileWork, @@ -257,7 +256,7 @@ public void setReduceSinkOperatorsAddedByEnforceBucketingSorting( /** * @return the topOps */ - public HashMap> getTopOps() { + public HashMap getTopOps() { return topOps; } @@ -265,7 +264,7 @@ public void setReduceSinkOperatorsAddedByEnforceBucketingSorting( * @param topOps * the topOps to set */ - public void setTopOps(HashMap> topOps) { + public void setTopOps(HashMap topOps) { this.topOps = topOps; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 5803a9c..6ec985d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -240,8 +240,7 @@ private HashMap opToPartPruner; private HashMap opToPartList; - protected HashMap> topOps; - private final HashMap> topSelOps; + protected HashMap topOps; protected LinkedHashMap, OpParseContext> opParseCtx; private List loadTableWork; private List loadFileWork; @@ -318,8 +317,7 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException { opToSamplePruner = new HashMap(); nameToSplitSample = new HashMap(); // Must be deterministic order maps - see HIVE-8707 - topOps = new LinkedHashMap>(); - topSelOps = new LinkedHashMap>(); + topOps = new LinkedHashMap(); loadTableWork = new ArrayList(); loadFileWork = new ArrayList(); opParseCtx = new LinkedHashMap, OpParseContext>(); @@ -357,7 +355,6 @@ protected void reset(boolean clearPartsCache) { loadTableWork.clear(); loadFileWork.clear(); topOps.clear(); - topSelOps.clear(); destTableId = 1; idToTableNameMap.clear(); qb = null; @@ -9254,11 +9251,7 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { RowResolver rwsch; // is the table already present - Operator top = topOps.get(alias_id); - Operator dummySel = topSelOps.get(alias_id); - if (dummySel != null) { - top = dummySel; - } + TableScanOperator top = topOps.get(alias_id); if (top == null) { // Determine row schema for TSOP. @@ -9312,7 +9305,7 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { nameToSplitSample.remove(alias_id); } - top = putOpInsertMap(OperatorFactory.get(tsDesc, + top = (TableScanOperator) putOpInsertMap(OperatorFactory.get(tsDesc, new RowSchema(rwsch.getColumnInfos())), rwsch); // Add this to the list of top operators - we always start from a table @@ -9320,11 +9313,11 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { topOps.put(alias_id, top); // Add a mapping from the table scan operator to Table - topToTable.put((TableScanOperator) top, tab); + topToTable.put(top, tab); Map props = qb.getTabPropsForAlias(alias); if (props != null) { - topToTableProps.put((TableScanOperator) top, props); + topToTableProps.put(top, props); tsDesc.setOpProps(props); } } else { @@ -9336,7 +9329,7 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { Operator op = top; TableSample ts = qb.getParseInfo().getTabSample(alias); if (ts != null) { - TableScanOperator tableScanOp = (TableScanOperator) top; + TableScanOperator tableScanOp = top; tableScanOp.getConf().setTableSample(ts); int num = ts.getNumerator(); int den = ts.getDenominator(); @@ -10536,6 +10529,7 @@ private ExprNodeDesc getExprNodeDescCached(ASTNode expr, RowResolver input) // For example, in Column[a].b.c and Column[a].b, Column[a].b should be // unparsed before Column[a].b.c Collections.sort(fieldDescList, new Comparator>() { + @Override public int compare(Entry o1, Entry o2) { ExprNodeFieldDesc fieldDescO1 = (ExprNodeFieldDesc) o1.getValue(); ExprNodeFieldDesc fieldDescO2 = (ExprNodeFieldDesc) o2.getValue(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 62237e1..b0ab495 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -135,7 +136,7 @@ // Alias to operator map, from the semantic analyzer. // This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias. - public final Map> topOps; + public final Map topOps; // The set of pruning sinks public final Set> pruningSinkSet; @@ -151,7 +152,7 @@ public GenSparkProcContext(HiveConf conf, List> rootTasks, Set inputs, Set outputs, - Map> topOps) { + Map topOps) { this.conf = conf; this.parseContext = parseContext; this.moveTask = moveTask; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 8dc48cd..924848f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -160,7 +160,7 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, String alias = ((TableScanOperator) root).getConf().getAlias(); if (!deferSetup) { - setupMapWork(mapWork, context, partitions, root, alias); + setupMapWork(mapWork, context, partitions,(TableScanOperator) root, alias); } // add new item to the Spark work @@ -171,7 +171,7 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, // this method's main use is to help unit testing this class protected void setupMapWork(MapWork mapWork, GenSparkProcContext context, - PrunedPartitionList partitions, Operator root, + PrunedPartitionList partitions, TableScanOperator root, String alias) throws SemanticException { // All the setup is done in GenMapRedUtils GenMapRedUtils.setMapWork(mapWork, context.parseContext, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 7e0e137..3673da4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -26,8 +26,6 @@ import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -97,7 +95,6 @@ public class SparkCompiler extends TaskCompiler { private static final String CLASS_NAME = SparkCompiler.class.getName(); private static final PerfLogger PERF_LOGGER = SessionState.getPerfLogger(); - private static final Logger LOGGER = LoggerFactory.getLogger(SparkCompiler.class); public SparkCompiler() { } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 73e8f6d..edafdb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -372,7 +372,7 @@ public void replaceRoots(Map, Operator> replacementMap) { @Override @Explain(displayName = "Map Operator Tree", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Set> getAllRootOperators() { + public Set> getAllRootOperators() { Set> opSet = new LinkedHashSet>(); for (Operator op : getAliasToWork().values()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 6ba122a..5bea6fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -951,7 +951,7 @@ public static void addExprToStringBuffer(ExprNodeDesc expr, Appendable sb) { throw new RuntimeException(e); } } - + public static void addPartitionInputs(Collection parts, Collection inputs, ReadEntity parentViewInfo, boolean isDirectRead) { // Store the inputs in a HashMap since we can't get a ReadEntity from inputs since it is @@ -990,12 +990,9 @@ public static void addPartitionInputs(Collection parts, Collection inputs = parseCtx.getSemanticInputs(); - for (Map.Entry> entry : parseCtx.getTopOps().entrySet()) { - if (!(entry.getValue() instanceof TableScanOperator)) { - continue; - } + for (Map.Entry entry : parseCtx.getTopOps().entrySet()) { String alias = entry.getKey(); - TableScanOperator topOp = (TableScanOperator) entry.getValue(); + TableScanOperator topOp = entry.getValue(); ReadEntity parentViewInfo = getParentViewInfo(alias, parseCtx.getViewAliasToInput()); // Adds tables only for create view (PPD filter can be appended by outer query) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java index d9ab9c0..9e5db23 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java @@ -26,8 +26,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -35,12 +33,9 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -71,17 +66,17 @@ public void setUp() throws Exception { ctx = new GenTezProcContext( new HiveConf(), new ParseContext(), - (List>)Collections.EMPTY_LIST, - (List>) new ArrayList>(), - (Set)Collections.EMPTY_SET, - (Set)Collections.EMPTY_SET); + Collections.EMPTY_LIST, + new ArrayList>(), + Collections.EMPTY_SET, + Collections.EMPTY_SET); proc = new GenTezWork(new GenTezUtils() { @Override - protected void setupMapWork(MapWork mapWork, GenTezProcContext context, - PrunedPartitionList partitions, Operator root, String alias) + protected void setupMapWork(MapWork mapWork, GenTezProcContext context, + PrunedPartitionList partitions, TableScanOperator root, String alias) throws SemanticException { - + LinkedHashMap> map = new LinkedHashMap>(); map.put("foo", root); -- 1.7.12.4 (Apple Git-37)