diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java index 70c23a6..5ddf556 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java @@ -131,10 +131,12 @@ private boolean checkNumberOfBucketsAgainstBigTable( protected boolean canConvertMapJoinToBucketMapJoin( MapJoinOperator mapJoinOp, - ParseContext pGraphContext, BucketJoinProcCtx context) throws SemanticException { - QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp); + QBJoinTree joinCtx = null; + if (this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { + joinCtx = mapJoinOp.getConf().getJoinTree(); + } if (joinCtx == null) { return false; } @@ -172,7 +174,6 @@ protected boolean canConvertMapJoinToBucketMapJoin( Map> keysMap = mapJoinOp.getConf().getKeys(); return checkConvertBucketMapJoin( - pGraphContext, context, joinCtx, keysMap, @@ -189,7 +190,6 @@ protected boolean canConvertMapJoinToBucketMapJoin( * d. The number of buckets in the big table can be divided by no of buckets in small tables. */ protected boolean checkConvertBucketMapJoin( - ParseContext pGraphContext, BucketJoinProcCtx context, QBJoinTree joinCtx, Map> keysMap, @@ -438,7 +438,7 @@ protected void convertMapJoinToBucketMapJoin( } // convert partition to partition spec string - private static Map> convert(Map> mapping) { + private Map> convert(Map> mapping) { Map> converted = new HashMap>(); for (Map.Entry> entry : mapping.entrySet()) { converted.put(entry.getKey().getName(), entry.getValue()); @@ -467,7 +467,7 @@ protected void convertMapJoinToBucketMapJoin( } // called for each partition of big table and populates mapping for each file in the partition - private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( + private void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( List smallTblBucketNums, List> smallTblFilesList, Map> bigTableBucketFileNameToSmallTableBucketFileNames, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index c9e8086..8dbeb83 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -97,17 +97,21 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, return false; } - boolean tableEligibleForBucketedSortMergeJoin = true; - QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() - .get(mapJoinOp); + QBJoinTree joinCxt = null; + if (this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { + joinCxt = mapJoinOp.getConf().getJoinTree(); + } if (joinCxt == null) { return false; } + String[] srcs = joinCxt.getBaseSrc(); for (int srcPos = 0; srcPos < srcs.length; srcPos++) { srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]); } + boolean tableEligibleForBucketedSortMergeJoin = true; + // All the tables/partitions columns should be sorted in the same order // For example, if tables A and B are being joined on columns c1, c2 and c3 // which are the sorted and bucketed columns. The join would work, as long @@ -117,7 +121,6 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, for (int pos = 0; pos < srcs.length; pos++) { tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin && isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, mapJoinOp.getConf().getKeys().get((byte) pos), joinCxt, srcs, @@ -141,8 +144,7 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, // Convert the bucket map-join operator to a sort-merge map join operator protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) { + SortBucketJoinProcCtx smbJoinContext) { String[] srcs = smbJoinContext.getSrcs(); SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp); @@ -219,10 +221,14 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo child.getParentOperators().remove(index); child.getParentOperators().add(index, smbJop); } - parseContext.getSmbMapJoinContext().put(smbJop, - parseContext.getMapJoinContext().get(mapJoinOp)); - parseContext.getMapJoinContext().remove(mapJoinOp); - parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp)); + QBJoinTree joinCxt = null; + if (this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { + joinCxt = mapJoinOp.getConf().getJoinTree(); + } + smbJop.getConf().setJoinTree(joinCxt); + pGraphContext.getSmbMapJoinOps().add(smbJop); + pGraphContext.getMapJoinOps().remove(mapJoinOp); + pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp)); return smbJop; } @@ -242,7 +248,6 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo */ private boolean isEligibleForBucketSortMergeJoin( SortBucketJoinProcCtx smbJoinContext, - ParseContext pctx, List keys, QBJoinTree joinTree, String[] aliases, @@ -386,11 +391,12 @@ private boolean checkSortColsAndJoinCols(List sortCols, // It is already verified that the join can be converted to a bucket map join protected boolean checkConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { - - QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { + QBJoinTree joinCtx = null; + if (this.pGraphContext.getJoinOps().contains(joinOperator)) { + joinCtx = joinOperator.getConf().getJoinTree(); + } if (joinCtx == null) { return false; } @@ -404,7 +410,6 @@ protected boolean checkConvertJoinToSMBJoin( for (int pos = 0; pos < srcs.length; pos++) { if (!isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, smbJoinContext.getKeyExprMap().get((byte) pos), joinCtx, srcs, @@ -421,12 +426,10 @@ protected boolean checkConvertJoinToSMBJoin( // Can the join operator be converted to a sort-merge join operator ? protected boolean canConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { boolean canConvert = canConvertJoinToBucketMapJoin( joinOperator, - pGraphContext, smbJoinContext ); @@ -434,13 +437,12 @@ protected boolean canConvertJoinToSMBJoin( return false; } - return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext); + return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext); } // Can the join operator be converted to a bucket map-merge join operator ? protected boolean canConvertJoinToBucketMapJoin( JoinOperator joinOp, - ParseContext pGraphContext, SortBucketJoinProcCtx context) throws SemanticException { // This has already been inspected and rejected @@ -448,7 +450,10 @@ protected boolean canConvertJoinToBucketMapJoin( return false; } - QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOp); + QBJoinTree joinCtx = null; + if (this.pGraphContext.getJoinOps().contains(joinOp)) { + joinCtx = joinOp.getConf().getJoinTree(); + } if (joinCtx == null) { return false; } @@ -508,7 +513,6 @@ protected boolean canConvertJoinToBucketMapJoin( // The candidate map-join was derived from the pluggable sort merge join big // table matcher. return checkConvertBucketMapJoin( - pGraphContext, context, joinCtx, keyExprMap, @@ -519,19 +523,23 @@ protected boolean canConvertJoinToBucketMapJoin( // Convert the join operator to a bucket map-join join operator protected MapJoinOperator convertJoinToBucketMapJoin( JoinOperator joinOp, - SortBucketJoinProcCtx joinContext, - ParseContext parseContext) throws SemanticException { + SortBucketJoinProcCtx joinContext) throws SemanticException { + QBJoinTree joinCtx = null; + if (this.pGraphContext.getJoinOps().contains(joinOp)) { + joinCtx = joinOp.getConf().getJoinTree(); + } MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin( - parseContext.getConf(), - parseContext.getOpParseCtx(), + pGraphContext.getConf(), + pGraphContext.getOpParseCtx(), joinOp, - pGraphContext.getJoinContext().get(joinOp), + joinCtx, joinContext.getBigTablePosition(), false, false); // Remove the join operator from the query join context - parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp)); - parseContext.getJoinContext().remove(joinOp); + mapJoinOp.getConf().setJoinTree(joinCtx); + pGraphContext.getMapJoinOps().add(mapJoinOp); + pGraphContext.getJoinOps().remove(joinOp); convertMapJoinToBucketMapJoin(mapJoinOp, joinContext); return mapJoinOp; } @@ -539,11 +547,10 @@ protected MapJoinOperator convertJoinToBucketMapJoin( // Convert the join operator to a sort-merge join operator protected void convertJoinToSMBJoin( JoinOperator joinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) throws SemanticException { - MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { + MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext); SMBMapJoinOperator smbMapJoinOp = - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); smbMapJoinOp.setConvertedAutomaticallySMBJoin(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java index 1260c83..7e3c134 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java @@ -42,7 +42,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // can the mapjoin present be converted to a bucketed mapjoin boolean convert = canConvertMapJoinToBucketMapJoin( - mapJoinOperator, pGraphContext, context); + mapJoinOperator, context); HiveConf conf = context.getConf(); // Throw an error if the user asked for bucketed mapjoin to be enforced and diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 7ab35ee..953c263 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -46,9 +46,10 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; @@ -229,15 +230,19 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren) throws SemanticException { ParseContext parseContext = context.parseContext; + QBJoinTree joinCtx = null; + if (parseContext.getJoinOps().contains(joinOp)) { + joinCtx = joinOp.getConf().getJoinTree(); + } MapJoinDesc mapJoinDesc = null; if (adjustParentsChildren) { - mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), - joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true); + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), + joinOp, joinCtx, mapJoinConversionPos, true); } else { JoinDesc joinDesc = joinOp.getConf(); // retain the original join desc in the map join. mapJoinDesc = - new MapJoinDesc(MapJoinProcessor.getKeys(parseContext.getJoinContext().get(joinOp), joinOp).getSecond(), + new MapJoinDesc(MapJoinProcessor.getKeys(joinCtx, joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); @@ -604,9 +609,13 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo //can safely convert the join to a map join. ParseContext parseContext = context.parseContext; + QBJoinTree joinCtx = null; + if (parseContext.getJoinOps().contains(joinOp)) { + joinCtx = joinOp.getConf().getJoinTree(); + } MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, - parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + joinCtx, bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 9a74e1e..09135a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -18,7 +18,20 @@ package org.apache.hadoop.hive.ql.optimizer; -import com.google.common.collect.Interner; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -100,19 +113,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; +import com.google.common.collect.Interner; /** * General utility common functions for the Processor to convert operator into @@ -1060,11 +1061,17 @@ private static void splitTasks(ReduceSinkOperator op, Operator reducerOp = cplan.getReduceWork().getReducer(); QBJoinTree joinTree = null; if (reducerOp instanceof JoinOperator) { - joinTree = parseCtx.getJoinContext().get(reducerOp); + if (parseCtx.getJoinOps().contains(reducerOp)) { + joinTree = ((JoinOperator)reducerOp).getConf().getJoinTree(); + } } else if (reducerOp instanceof MapJoinOperator) { - joinTree = parseCtx.getMapJoinContext().get(reducerOp); + if (parseCtx.getMapJoinOps().contains(reducerOp)) { + joinTree = ((MapJoinOperator)reducerOp).getConf().getJoinTree(); + } } else if (reducerOp instanceof SMBMapJoinOperator) { - joinTree = parseCtx.getSmbMapJoinContext().get(reducerOp); + if (parseCtx.getSmbMapJoinOps().contains(reducerOp)) { + joinTree = ((SMBMapJoinOperator)reducerOp).getConf().getJoinTree(); + } } if (joinTree != null && joinTree.getId() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 9238e0e..51a0608 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -95,7 +95,8 @@ private int getOutputSize(Operator operator, private Set getBigTables(ParseContext joinCtx) { Set bigTables = new HashSet(); - for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) { + for (JoinOperator joinOp : joinCtx.getJoinOps()) { + QBJoinTree qbJoin = joinOp.getConf().getJoinTree(); if (qbJoin.getStreamAliases() != null) { bigTables.addAll(qbJoin.getStreamAliases()); } @@ -155,7 +156,7 @@ private void reorder(JoinOperator joinOp, Set bigTables) { public ParseContext transform(ParseContext pactx) throws SemanticException { Set bigTables = getBigTables(pactx); - for (JoinOperator joinOp : pactx.getJoinContext().keySet()) { + for (JoinOperator joinOp : pactx.getJoinOps()) { reorder(joinOp, bigTables); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index ccb3ce5..7594ea2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -95,19 +95,14 @@ // (column type + column name). The column name is not really used anywhere, but it // needs to be passed. Use the string defined below for that. private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey"; - - private ParseContext pGraphContext; - - /** - * empty constructor. - */ + public MapJoinProcessor() { - pGraphContext = null; } @SuppressWarnings("nls") - private Operator - putOpInsertMap(Operator op, RowResolver rr) { + private static Operator putOpInsertMap ( + ParseContext pGraphContext, Operator op, + RowResolver rr) { OpParseContext ctx = new OpParseContext(rr); pGraphContext.getOpParseCtx().put(op, ctx); return op; @@ -624,7 +619,7 @@ private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Sema SelectDesc select = new SelectDesc(exprs, outputs, false); - SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select, + SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR.getColumnInfos()), input), inputRR); sel.setColumnExprMap(colExprMap); @@ -689,36 +684,35 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc */ @Override public ParseContext transform(ParseContext pactx) throws SemanticException { - pGraphContext = pactx; List listMapJoinOps = new ArrayList(); // traverse all the joins and convert them if necessary - if (pGraphContext.getJoinContext() != null) { - Map joinMap = new HashMap(); - Map mapJoinMap = pGraphContext.getMapJoinContext(); + if (pactx.getJoinOps() != null) { + Set joinMap = new HashSet(); + Set mapJoinMap = pactx.getMapJoinOps(); if (mapJoinMap == null) { - mapJoinMap = new HashMap(); - pGraphContext.setMapJoinContext(mapJoinMap); + mapJoinMap = new HashSet(); + pactx.setMapJoinOps(mapJoinMap); } - Set> joinCtx = pGraphContext.getJoinContext().entrySet(); - Iterator> joinCtxIter = joinCtx.iterator(); + Iterator joinCtxIter = pactx.getJoinOps().iterator(); while (joinCtxIter.hasNext()) { - Map.Entry joinEntry = joinCtxIter.next(); - JoinOperator joinOp = joinEntry.getKey(); - QBJoinTree qbJoin = joinEntry.getValue(); + JoinOperator joinOp = joinCtxIter.next(); + QBJoinTree qbJoin = joinOp.getConf().getJoinTree(); int mapJoinPos = mapSideJoin(joinOp, qbJoin); if (mapJoinPos >= 0) { MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos); listMapJoinOps.add(mapJoinOp); - mapJoinMap.put(mapJoinOp, qbJoin); + mapJoinOp.getConf().setJoinTree(qbJoin); + mapJoinMap.add(mapJoinOp); } else { - joinMap.put(joinOp, qbJoin); + joinOp.getConf().setJoinTree(qbJoin); + joinMap.add(joinOp); } } // store the new joinContext - pGraphContext.setJoinContext(joinMap); + pactx.setJoinOps(joinMap); } // Go over the list and find if a reducer is not needed @@ -744,15 +738,15 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx( - listMapJoinOpsNoRed, pGraphContext)); + listMapJoinOpsNoRed, pactx)); GraphWalker ogw = new GenMapRedWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.addAll(listMapJoinOps); ogw.startWalking(topNodes, null); - pGraphContext.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); - return pGraphContext; + pactx.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); + return pactx; } /** @@ -820,8 +814,14 @@ private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOpera private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) { - QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin); - QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin); + QBJoinTree joinTree = null; + if (pGraphContext.getMapJoinOps().contains(mapJoin)) { + joinTree = mapJoin.getConf().getJoinTree(); + } + QBJoinTree parentJoinTree = null; + if (pGraphContext.getMapJoinOps().contains(parentMapJoin)) { + parentJoinTree = parentMapJoin.getConf().getJoinTree(); + } if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index 5291851..e997eed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -55,15 +56,12 @@ */ public class NonBlockingOpDeDupProc implements Transform { - private ParseContext pctx; - @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - this.pctx = pctx; String SEL = SelectOperator.getOperatorName(); String FIL = FilterOperator.getOperatorName(); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup()); + opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx)); opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); @@ -76,6 +74,13 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } private class SelectDedup implements NodeProcessor { + + private ParseContext pctx; + + public SelectDedup (ParseContext pctx) { + this.pctx = pctx; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -178,28 +183,37 @@ private boolean checkReferences(ExprNodeDesc expr, Set funcOutputs, Set< } return true; } - } - - /** - * Change existing references in the context to point from child to parent operator. - * @param cSEL child operator (to be removed, and merged into parent) - * @param pSEL parent operator - */ - private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { - Collection qbJoinTrees = new ArrayList(); - qbJoinTrees.addAll(pctx.getJoinContext().values()); - qbJoinTrees.addAll(pctx.getMapJoinContext().values()); - for (QBJoinTree qbJoinTree : qbJoinTrees) { - Map> aliasToOpInfo = qbJoinTree.getAliasToOpInfo(); - for (Map.Entry> entry : aliasToOpInfo.entrySet()) { - if (entry.getValue() == cSEL) { - aliasToOpInfo.put(entry.getKey(), pSEL); + + /** + * Change existing references in the context to point from child to parent operator. + * @param cSEL child operator (to be removed, and merged into parent) + * @param pSEL parent operator + */ + private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { + Collection qbJoinTrees = new ArrayList(); + for (JoinOperator joinOp : pctx.getJoinOps()) { + if (joinOp.getConf().getJoinTree() != null) { + qbJoinTrees.add(joinOp.getConf().getJoinTree()); + } + } + for (MapJoinOperator mapJoinOp : pctx.getMapJoinOps()) { + if (mapJoinOp.getConf().getJoinTree() != null) { + qbJoinTrees.add(mapJoinOp.getConf().getJoinTree()); + } + } + for (QBJoinTree qbJoinTree : qbJoinTrees) { + Map> aliasToOpInfo = qbJoinTree.getAliasToOpInfo(); + for (Map.Entry> entry : aliasToOpInfo.entrySet()) { + if (entry.getValue() == cSEL) { + aliasToOpInfo.put(entry.getKey(), pSEL); + } } } } } private class FilterDedup implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index ea06503..ea1779c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -79,11 +79,13 @@ public class SkewJoinOptimizer implements Transform { private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName()); - private static ParseContext parseContext; public static class SkewJoinProc implements NodeProcessor { - public SkewJoinProc() { + private ParseContext parseContext; + + public SkewJoinProc(ParseContext parseContext) { super(); + this.parseContext = parseContext; } @Override @@ -166,7 +168,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } // have to create a QBJoinTree for the cloned join operator - QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp); + QBJoinTree originJoinTree = null; + if (parseContext.getJoinOps().contains(joinOp)) { + originJoinTree = joinOp.getConf().getJoinTree(); + } QBJoinTree newJoinTree; try { newJoinTree = originJoinTree.clone(); @@ -181,7 +186,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } else { joinOpClone = (JoinOperator)currOpClone; } - parseContext.getJoinContext().put(joinOpClone, newJoinTree); + joinOpClone.getConf().setJoinTree(newJoinTree); + parseContext.getJoinOps().add(joinOpClone); List tableScanCloneOpsForJoin = new ArrayList(); @@ -677,7 +683,7 @@ private static void replaceAlias(List origin, List cloned, public ParseContext transform(ParseContext pctx) throws SemanticException { Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc()); + opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc(pctx)); SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -692,8 +698,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } - private NodeProcessor getSkewJoinProc() { - return new SkewJoinProc(); + private NodeProcessor getSkewJoinProc(ParseContext parseContext) { + return new SkewJoinProc(parseContext); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java index 11ce47e..f6ca039 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java @@ -60,7 +60,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } if (convert) { - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java index 8a0c474..d090598 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java @@ -44,10 +44,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx; boolean convert = canConvertJoinToSMBJoin( - joinOp, smbJoinContext, pGraphContext); + joinOp, smbJoinContext); if (convert) { - convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext); + convertJoinToSMBJoin(joinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index c52f753..d4de0bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -107,7 +107,7 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { // that has both intermediate tables and query input tables as input tables, // we should be able to guess if this JoinOperator will be converted to a MapJoin // based on hive.auto.convert.join.noconditionaltask.size. - for (JoinOperator joinOp: pCtx.getJoinContext().keySet()) { + for (JoinOperator joinOp: pCtx.getJoinOps()) { boolean isAbleToGuess = true; boolean mayConvert = false; // Get total size and individual alias's size diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 33ef581..a3a7f42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MapWork; /** @@ -53,8 +52,7 @@ public AbstractJoinTaskDispatcher(PhysicalContext context) { throws SemanticException; protected void replaceTaskWithConditionalTask( - Task currTask, ConditionalTask cndTsk, - PhysicalContext physicalContext) { + Task currTask, ConditionalTask cndTsk) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); @@ -88,8 +86,7 @@ protected void replaceTaskWithConditionalTask( // Replace the task with the new task. Copy the children and parents of the old // task to the new task. protected void replaceTask( - Task currTask, Task newTask, - PhysicalContext physicalContext) { + Task currTask, Task newTask) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 9c26907..8e4bddd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -400,7 +400,10 @@ public static boolean cannotConvert(long aliasKnownSize, // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp); + QBJoinTree joinTree = null; + if (parseCtx.getJoinOps().contains(joinOp)) { + joinTree = joinOp.getConf().getJoinTree(); + } // start to generate multiple map join tasks JoinDesc joinDesc = joinOp.getConf(); @@ -466,7 +469,7 @@ public static boolean cannotConvert(long aliasKnownSize, newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP); newTask.setFetchSource(currTask.isFetchSource()); - replaceTask(currTask, newTask, physicalContext); + replaceTask(currTask, newTask); // Can this task be merged with the child task. This can happen if a big table is being // joined with multiple small tables on different keys @@ -541,7 +544,7 @@ public static boolean cannotConvert(long aliasKnownSize, cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 6f92b13..fda2b14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -246,7 +246,10 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getSmbMapJoinContext().get(originalSMBJoinOp); + QBJoinTree joinTree = null; + if (parseCtx.getSmbMapJoinOps().contains(originalSMBJoinOp)) { + joinTree = originalSMBJoinOp.getConf().getJoinTree(); + } // Convert the work containing to sort-merge join into a work, as if it had a regular join. // Note that the operator tree is not changed - is still contains the SMB join, but the @@ -353,7 +356,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 8215c26..81a3256 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; @@ -75,9 +74,9 @@ private HashMap> topOps; private HashMap> topSelOps; private LinkedHashMap, OpParseContext> opParseCtx; - private Map joinContext; - private Map mapJoinContext; - private Map smbMapJoinContext; + private Set joinOps; + private Set mapJoinOps; + private Set smbMapJoinOps; private HashMap topToTable; private Map fsopToTable; private List reduceSinkOperatorsAddedByEnforceBucketingSorting; @@ -133,7 +132,7 @@ public ParseContext() { * @param opParseCtx * operator parse context - contains a mapping from operator to * operator parse state (row resolver etc.) - * @param joinContext + * @param joinOps * context needed join processing (map join specifically) * @param topToTable * the top tables being processed @@ -165,8 +164,8 @@ public ParseContext( HashMap> topOps, HashMap> topSelOps, LinkedHashMap, OpParseContext> opParseCtx, - Map joinContext, - Map smbMapJoinContext, + Set joinOps, + Set smbMapJoinOps, HashMap topToTable, HashMap> topToProps, Map fsopToTable, @@ -188,8 +187,8 @@ public ParseContext( this.ast = ast; this.opToPartPruner = opToPartPruner; this.opToPartList = opToPartList; - this.joinContext = joinContext; - this.smbMapJoinContext = smbMapJoinContext; + this.joinOps = joinOps; + this.smbMapJoinOps = smbMapJoinOps; this.topToTable = topToTable; this.fsopToTable = fsopToTable; this.topToProps = topToProps; @@ -476,18 +475,18 @@ public void setUCtx(UnionProcContext uCtx) { } /** - * @return the joinContext + * @return the joinOps */ - public Map getJoinContext() { - return joinContext; + public Set getJoinOps() { + return joinOps; } /** - * @param joinContext - * the joinContext to set + * @param joinOps + * the joinOps to set */ - public void setJoinContext(Map joinContext) { - this.joinContext = joinContext; + public void setJoinOps(Set joinOps) { + this.joinOps = joinOps; } /** @@ -570,20 +569,20 @@ public LineageInfo getLineageInfo() { return lInfo; } - public Map getMapJoinContext() { - return mapJoinContext; + public Set getMapJoinOps() { + return mapJoinOps; } - public void setMapJoinContext(Map mapJoinContext) { - this.mapJoinContext = mapJoinContext; + public void setMapJoinOps(Set mapJoinOps) { + this.mapJoinOps = mapJoinOps; } - public Map getSmbMapJoinContext() { - return smbMapJoinContext; + public Set getSmbMapJoinOps() { + return smbMapJoinOps; } - public void setSmbMapJoinContext(Map smbMapJoinContext) { - this.smbMapJoinContext = smbMapJoinContext; + public void setSmbMapJoinOps(Set smbMapJoinOps) { + this.smbMapJoinOps = smbMapJoinOps; } public GlobalLimitCtx getGlobalLimitCtx() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c2d5c8c..d8be263 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -378,8 +378,18 @@ public void initParseCtx(ParseContext pctx) { opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); - joinContext = pctx.getJoinContext(); - smbMapJoinContext = pctx.getSmbMapJoinContext(); + if (pctx.getJoinOps() != null) { + joinContext = new HashMap(); + for (JoinOperator joinOp : pctx.getJoinOps()) { + joinContext.put(joinOp, joinOp.getConf().getJoinTree()); + } + } + if (pctx.getSmbMapJoinOps() != null) { + smbMapJoinContext = new HashMap(); + for (SMBMapJoinOperator smbMapJoinOp : pctx.getSmbMapJoinOps()) { + smbMapJoinContext.put(smbMapJoinOp, smbMapJoinOp.getConf().getJoinTree()); + } + } ctx = pctx.getContext(); destTableId = pctx.getDestTableId(); idToTableNameMap = pctx.getIdToTableNameMap(); @@ -394,7 +404,10 @@ public void initParseCtx(ParseContext pctx) { public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, - topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topSelOps, opParseCtx, + new HashSet(joinContext.keySet()), + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, @@ -7363,6 +7376,7 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree, joinSrcOp, srcOps, omitOpts, joinKeys); + joinOp.getConf().setJoinTree(joinTree); joinContext.put(joinOp, joinTree); Operator op = joinOp; @@ -9988,7 +10002,10 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce // 4. Generate Parse Context for Optimizer & Physical compiler ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, opToPartPruner, opToPartList, - topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topOps, topSelOps, opParseCtx, + new HashSet(joinContext.keySet()), + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java index da14ab4..3612c97 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -174,7 +173,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Get the key column names for each side of the join, // and check if the keys are all constants // or columns (not expressions). If yes, proceed. - QBJoinTree joinTree = pGraphContext.getJoinContext().get(op); + QBJoinTree joinTree = null; + if (pGraphContext.getJoinOps().contains(op)) { + joinTree = op.getConf().getJoinTree(); + } assert(parentOps.size() == joinTree.getBaseSrc().length); int pos = 0; for (String src : joinTree.getBaseSrc()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 23fbbe1..814a9f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Set; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -60,6 +58,9 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; + /** * TaskCompiler is a the base class for classes that compile * operator pipelines into tasks. @@ -386,8 +387,8 @@ public ParseContext getParseContext(ParseContext pCtx, List