diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java index 70c23a6..93dd654 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java @@ -131,10 +131,12 @@ private boolean checkNumberOfBucketsAgainstBigTable( protected boolean canConvertMapJoinToBucketMapJoin( MapJoinOperator mapJoinOp, - ParseContext pGraphContext, BucketJoinProcCtx context) throws SemanticException { - QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp); + QBJoinTree joinCtx = null; + if (this.pGraphContext.getMapJoinContext().contains(mapJoinOp)) { + joinCtx = mapJoinOp.getConf().getJoinTree(); + } if (joinCtx == null) { return false; } @@ -172,7 +174,6 @@ protected boolean canConvertMapJoinToBucketMapJoin( Map> keysMap = mapJoinOp.getConf().getKeys(); return checkConvertBucketMapJoin( - pGraphContext, context, joinCtx, keysMap, @@ -189,7 +190,6 @@ protected boolean canConvertMapJoinToBucketMapJoin( * d. The number of buckets in the big table can be divided by no of buckets in small tables. */ protected boolean checkConvertBucketMapJoin( - ParseContext pGraphContext, BucketJoinProcCtx context, QBJoinTree joinCtx, Map> keysMap, @@ -438,7 +438,7 @@ protected void convertMapJoinToBucketMapJoin( } // convert partition to partition spec string - private static Map> convert(Map> mapping) { + private Map> convert(Map> mapping) { Map> converted = new HashMap>(); for (Map.Entry> entry : mapping.entrySet()) { converted.put(entry.getKey().getName(), entry.getValue()); @@ -467,7 +467,7 @@ protected void convertMapJoinToBucketMapJoin( } // called for each partition of big table and populates mapping for each file in the partition - private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( + private void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( List smallTblBucketNums, List> smallTblFilesList, Map> bigTableBucketFileNameToSmallTableBucketFileNames, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index c9e8086..a8d5bc6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -97,17 +97,21 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, return false; } - boolean tableEligibleForBucketedSortMergeJoin = true; - QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() - .get(mapJoinOp); + QBJoinTree joinCxt = null; + if (this.pGraphContext.getMapJoinContext().contains(mapJoinOp)) { + joinCxt = mapJoinOp.getConf().getJoinTree(); + } if (joinCxt == null) { return false; } + String[] srcs = joinCxt.getBaseSrc(); for (int srcPos = 0; srcPos < srcs.length; srcPos++) { srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]); } + boolean tableEligibleForBucketedSortMergeJoin = true; + // All the tables/partitions columns should be sorted in the same order // For example, if tables A and B are being joined on columns c1, c2 and c3 // which are the sorted and bucketed columns. The join would work, as long @@ -117,7 +121,6 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, for (int pos = 0; pos < srcs.length; pos++) { tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin && isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, mapJoinOp.getConf().getKeys().get((byte) pos), joinCxt, srcs, @@ -141,8 +144,7 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, // Convert the bucket map-join operator to a sort-merge map join operator protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) { + SortBucketJoinProcCtx smbJoinContext) { String[] srcs = smbJoinContext.getSrcs(); SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp); @@ -219,10 +221,14 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo child.getParentOperators().remove(index); child.getParentOperators().add(index, smbJop); } - parseContext.getSmbMapJoinContext().put(smbJop, - parseContext.getMapJoinContext().get(mapJoinOp)); - parseContext.getMapJoinContext().remove(mapJoinOp); - parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp)); + QBJoinTree joinCxt = null; + if (this.pGraphContext.getMapJoinContext().contains(mapJoinOp)) { + joinCxt = mapJoinOp.getConf().getJoinTree(); + } + smbJop.getConf().setJoinTree(joinCxt); + pGraphContext.getSmbMapJoinContext().add(smbJop); + pGraphContext.getMapJoinContext().remove(mapJoinOp); + pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp)); return smbJop; } @@ -242,7 +248,6 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo */ private boolean isEligibleForBucketSortMergeJoin( SortBucketJoinProcCtx smbJoinContext, - ParseContext pctx, List keys, QBJoinTree joinTree, String[] aliases, @@ -386,8 +391,7 @@ private boolean checkSortColsAndJoinCols(List sortCols, // It is already verified that the join can be converted to a bucket map join protected boolean checkConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator); @@ -404,7 +408,6 @@ protected boolean checkConvertJoinToSMBJoin( for (int pos = 0; pos < srcs.length; pos++) { if (!isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, smbJoinContext.getKeyExprMap().get((byte) pos), joinCtx, srcs, @@ -421,12 +424,10 @@ protected boolean checkConvertJoinToSMBJoin( // Can the join operator be converted to a sort-merge join operator ? protected boolean canConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { boolean canConvert = canConvertJoinToBucketMapJoin( joinOperator, - pGraphContext, smbJoinContext ); @@ -434,13 +435,12 @@ protected boolean canConvertJoinToSMBJoin( return false; } - return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext); + return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext); } // Can the join operator be converted to a bucket map-merge join operator ? protected boolean canConvertJoinToBucketMapJoin( JoinOperator joinOp, - ParseContext pGraphContext, SortBucketJoinProcCtx context) throws SemanticException { // This has already been inspected and rejected @@ -508,7 +508,6 @@ protected boolean canConvertJoinToBucketMapJoin( // The candidate map-join was derived from the pluggable sort merge join big // table matcher. return checkConvertBucketMapJoin( - pGraphContext, context, joinCtx, keyExprMap, @@ -519,19 +518,19 @@ protected boolean canConvertJoinToBucketMapJoin( // Convert the join operator to a bucket map-join join operator protected MapJoinOperator convertJoinToBucketMapJoin( JoinOperator joinOp, - SortBucketJoinProcCtx joinContext, - ParseContext parseContext) throws SemanticException { + SortBucketJoinProcCtx joinContext) throws SemanticException { MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin( - parseContext.getConf(), - parseContext.getOpParseCtx(), + pGraphContext.getConf(), + pGraphContext.getOpParseCtx(), joinOp, pGraphContext.getJoinContext().get(joinOp), joinContext.getBigTablePosition(), false, false); // Remove the join operator from the query join context - parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp)); - parseContext.getJoinContext().remove(joinOp); + mapJoinOp.getConf().setJoinTree(pGraphContext.getJoinContext().get(joinOp)); + pGraphContext.getMapJoinContext().add(mapJoinOp); + pGraphContext.getJoinContext().remove(joinOp); convertMapJoinToBucketMapJoin(mapJoinOp, joinContext); return mapJoinOp; } @@ -539,11 +538,10 @@ protected MapJoinOperator convertJoinToBucketMapJoin( // Convert the join operator to a sort-merge join operator protected void convertJoinToSMBJoin( JoinOperator joinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) throws SemanticException { - MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { + MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext); SMBMapJoinOperator smbMapJoinOp = - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); smbMapJoinOp.setConvertedAutomaticallySMBJoin(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java index 1260c83..7e3c134 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java @@ -42,7 +42,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // can the mapjoin present be converted to a bucketed mapjoin boolean convert = canConvertMapJoinToBucketMapJoin( - mapJoinOperator, pGraphContext, context); + mapJoinOperator, context); HiveConf conf = context.getConf(); // Throw an error if the user asked for bucketed mapjoin to be enforced and diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 9a74e1e..6ed67d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1062,9 +1062,13 @@ private static void splitTasks(ReduceSinkOperator op, if (reducerOp instanceof JoinOperator) { joinTree = parseCtx.getJoinContext().get(reducerOp); } else if (reducerOp instanceof MapJoinOperator) { - joinTree = parseCtx.getMapJoinContext().get(reducerOp); + if (parseCtx.getMapJoinContext().contains(reducerOp)) { + joinTree = ((MapJoinOperator)reducerOp).getConf().getJoinTree(); + } } else if (reducerOp instanceof SMBMapJoinOperator) { - joinTree = parseCtx.getSmbMapJoinContext().get(reducerOp); + if (parseCtx.getSmbMapJoinContext().contains(reducerOp)) { + joinTree = ((SMBMapJoinOperator)reducerOp).getConf().getJoinTree(); + } } if (joinTree != null && joinTree.getId() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index ccb3ce5..e14e311 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -95,19 +95,14 @@ // (column type + column name). The column name is not really used anywhere, but it // needs to be passed. Use the string defined below for that. private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey"; - - private ParseContext pGraphContext; - - /** - * empty constructor. - */ + public MapJoinProcessor() { - pGraphContext = null; } @SuppressWarnings("nls") - private Operator - putOpInsertMap(Operator op, RowResolver rr) { + private static Operator putOpInsertMap ( + ParseContext pGraphContext, Operator op, + RowResolver rr) { OpParseContext ctx = new OpParseContext(rr); pGraphContext.getOpParseCtx().put(op, ctx); return op; @@ -624,7 +619,7 @@ private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Sema SelectDesc select = new SelectDesc(exprs, outputs, false); - SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select, + SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR.getColumnInfos()), input), inputRR); sel.setColumnExprMap(colExprMap); @@ -689,19 +684,18 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc */ @Override public ParseContext transform(ParseContext pactx) throws SemanticException { - pGraphContext = pactx; List listMapJoinOps = new ArrayList(); // traverse all the joins and convert them if necessary - if (pGraphContext.getJoinContext() != null) { + if (pactx.getJoinContext() != null) { Map joinMap = new HashMap(); - Map mapJoinMap = pGraphContext.getMapJoinContext(); + Set mapJoinMap = pactx.getMapJoinContext(); if (mapJoinMap == null) { - mapJoinMap = new HashMap(); - pGraphContext.setMapJoinContext(mapJoinMap); + mapJoinMap = new HashSet(); + pactx.setMapJoinContext(mapJoinMap); } - Set> joinCtx = pGraphContext.getJoinContext().entrySet(); + Set> joinCtx = pactx.getJoinContext().entrySet(); Iterator> joinCtxIter = joinCtx.iterator(); while (joinCtxIter.hasNext()) { Map.Entry joinEntry = joinCtxIter.next(); @@ -711,14 +705,15 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { if (mapJoinPos >= 0) { MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos); listMapJoinOps.add(mapJoinOp); - mapJoinMap.put(mapJoinOp, qbJoin); + mapJoinOp.getConf().setJoinTree(qbJoin); + mapJoinMap.add(mapJoinOp); } else { joinMap.put(joinOp, qbJoin); } } // store the new joinContext - pGraphContext.setJoinContext(joinMap); + pactx.setJoinContext(joinMap); } // Go over the list and find if a reducer is not needed @@ -744,15 +739,15 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx( - listMapJoinOpsNoRed, pGraphContext)); + listMapJoinOpsNoRed, pactx)); GraphWalker ogw = new GenMapRedWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.addAll(listMapJoinOps); ogw.startWalking(topNodes, null); - pGraphContext.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); - return pGraphContext; + pactx.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); + return pactx; } /** @@ -820,8 +815,14 @@ private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOpera private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) { - QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin); - QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin); + QBJoinTree joinTree = null; + if (pGraphContext.getMapJoinContext().contains(mapJoin)) { + joinTree = mapJoin.getConf().getJoinTree(); + } + QBJoinTree parentJoinTree = null; + if (pGraphContext.getMapJoinContext().contains(parentMapJoin)) { + parentJoinTree = parentMapJoin.getConf().getJoinTree(); + } if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index 5291851..3483c60 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -29,7 +29,7 @@ import java.util.Stack; import org.apache.hadoop.hive.ql.exec.FilterOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -55,15 +55,12 @@ */ public class NonBlockingOpDeDupProc implements Transform { - private ParseContext pctx; - @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - this.pctx = pctx; String SEL = SelectOperator.getOperatorName(); String FIL = FilterOperator.getOperatorName(); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup()); + opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx)); opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); @@ -76,6 +73,13 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } private class SelectDedup implements NodeProcessor { + + private ParseContext pctx; + + public SelectDedup (ParseContext pctx) { + this.pctx = pctx; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -178,28 +182,33 @@ private boolean checkReferences(ExprNodeDesc expr, Set funcOutputs, Set< } return true; } - } - - /** - * Change existing references in the context to point from child to parent operator. - * @param cSEL child operator (to be removed, and merged into parent) - * @param pSEL parent operator - */ - private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { - Collection qbJoinTrees = new ArrayList(); - qbJoinTrees.addAll(pctx.getJoinContext().values()); - qbJoinTrees.addAll(pctx.getMapJoinContext().values()); - for (QBJoinTree qbJoinTree : qbJoinTrees) { - Map> aliasToOpInfo = qbJoinTree.getAliasToOpInfo(); - for (Map.Entry> entry : aliasToOpInfo.entrySet()) { - if (entry.getValue() == cSEL) { - aliasToOpInfo.put(entry.getKey(), pSEL); + + /** + * Change existing references in the context to point from child to parent operator. + * @param cSEL child operator (to be removed, and merged into parent) + * @param pSEL parent operator + */ + private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { + Collection qbJoinTrees = new ArrayList(); + qbJoinTrees.addAll(pctx.getJoinContext().values()); + for (MapJoinOperator mapJoinOp : pctx.getMapJoinContext()) { + if (mapJoinOp.getConf().getJoinTree() != null) { + qbJoinTrees.add(mapJoinOp.getConf().getJoinTree()); + } + } + for (QBJoinTree qbJoinTree : qbJoinTrees) { + Map> aliasToOpInfo = qbJoinTree.getAliasToOpInfo(); + for (Map.Entry> entry : aliasToOpInfo.entrySet()) { + if (entry.getValue() == cSEL) { + aliasToOpInfo.put(entry.getKey(), pSEL); + } } } } } private class FilterDedup implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java index 11ce47e..f6ca039 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java @@ -60,7 +60,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } if (convert) { - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java index 8a0c474..d090598 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java @@ -44,10 +44,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx; boolean convert = canConvertJoinToSMBJoin( - joinOp, smbJoinContext, pGraphContext); + joinOp, smbJoinContext); if (convert) { - convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext); + convertJoinToSMBJoin(joinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 33ef581..a3a7f42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MapWork; /** @@ -53,8 +52,7 @@ public AbstractJoinTaskDispatcher(PhysicalContext context) { throws SemanticException; protected void replaceTaskWithConditionalTask( - Task currTask, ConditionalTask cndTsk, - PhysicalContext physicalContext) { + Task currTask, ConditionalTask cndTsk) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); @@ -88,8 +86,7 @@ protected void replaceTaskWithConditionalTask( // Replace the task with the new task. Copy the children and parents of the old // task to the new task. protected void replaceTask( - Task currTask, Task newTask, - PhysicalContext physicalContext) { + Task currTask, Task newTask) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 9c26907..eafca81 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -466,7 +466,7 @@ public static boolean cannotConvert(long aliasKnownSize, newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP); newTask.setFetchSource(currTask.isFetchSource()); - replaceTask(currTask, newTask, physicalContext); + replaceTask(currTask, newTask); // Can this task be merged with the child task. This can happen if a big table is being // joined with multiple small tables on different keys @@ -541,7 +541,7 @@ public static boolean cannotConvert(long aliasKnownSize, cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 6f92b13..d7db2fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -246,7 +246,10 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getSmbMapJoinContext().get(originalSMBJoinOp); + QBJoinTree joinTree = null; + if (parseCtx.getSmbMapJoinContext().contains(originalSMBJoinOp)) { + joinTree = originalSMBJoinOp.getConf().getJoinTree(); + } // Convert the work containing to sort-merge join into a work, as if it had a regular join. // Note that the operator tree is not changed - is still contains the SMB join, but the @@ -353,7 +356,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 8215c26..36ae930 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; @@ -76,8 +75,8 @@ private HashMap> topSelOps; private LinkedHashMap, OpParseContext> opParseCtx; private Map joinContext; - private Map mapJoinContext; - private Map smbMapJoinContext; + private Set mapJoinContext; + private Set smbMapJoinContext; private HashMap topToTable; private Map fsopToTable; private List reduceSinkOperatorsAddedByEnforceBucketingSorting; @@ -166,7 +165,7 @@ public ParseContext( HashMap> topSelOps, LinkedHashMap, OpParseContext> opParseCtx, Map joinContext, - Map smbMapJoinContext, + Set smbMapJoinContext, HashMap topToTable, HashMap> topToProps, Map fsopToTable, @@ -570,19 +569,19 @@ public LineageInfo getLineageInfo() { return lInfo; } - public Map getMapJoinContext() { + public Set getMapJoinContext() { return mapJoinContext; } - public void setMapJoinContext(Map mapJoinContext) { + public void setMapJoinContext(Set mapJoinContext) { this.mapJoinContext = mapJoinContext; } - public Map getSmbMapJoinContext() { + public Set getSmbMapJoinContext() { return smbMapJoinContext; } - public void setSmbMapJoinContext(Map smbMapJoinContext) { + public void setSmbMapJoinContext(Set smbMapJoinContext) { this.smbMapJoinContext = smbMapJoinContext; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c2d5c8c..822bda8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -379,7 +379,12 @@ public void initParseCtx(ParseContext pctx) { loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); joinContext = pctx.getJoinContext(); - smbMapJoinContext = pctx.getSmbMapJoinContext(); + if (pctx.getSmbMapJoinContext() != null) { + smbMapJoinContext = new HashMap(); + for (SMBMapJoinOperator smbMapJoinOp : pctx.getSmbMapJoinContext()) { + smbMapJoinContext.put(smbMapJoinOp, smbMapJoinOp.getConf().getJoinTree()); + } + } ctx = pctx.getContext(); destTableId = pctx.getDestTableId(); idToTableNameMap = pctx.getIdToTableNameMap(); @@ -394,7 +399,9 @@ public void initParseCtx(ParseContext pctx) { public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, - topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topSelOps, opParseCtx, joinContext, + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, @@ -9988,7 +9995,9 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce // 4. Generate Parse Context for Optimizer & Physical compiler ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, opToPartPruner, opToPartList, - topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topOps, topSelOps, opParseCtx, joinContext, + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index c144d8c..d51fcd7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; /** * Join operator Descriptor implementation. @@ -85,9 +86,12 @@ // this operator cannot be converted to mapjoin cause output is expected to be sorted on join key // it's resulted from RS-dedup optimization, which removes following RS under some condition private boolean fixedAsSorted; - + // used only for explain. private transient ExprNodeDesc [][] joinKeys; + + private transient QBJoinTree joinTree; + public JoinDesc() { } @@ -509,4 +513,12 @@ public boolean isFixedAsSorted() { public void setFixedAsSorted(boolean fixedAsSorted) { this.fixedAsSorted = fixedAsSorted; } + + public QBJoinTree getJoinTree() { + return joinTree; + } + + public void setJoinTree(QBJoinTree joinTree) { + this.joinTree = joinTree; + } }