diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2e771ec..0a5db47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1066,7 +1066,6 @@ protected synchronized Kryo initialValue() { removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); removeField(kryo, MapWork.class, "opParseCtxMap"); - removeField(kryo, MapWork.class, "joinTree"); return kryo; }; }; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java index 03742d4..57c8a17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -132,23 +131,21 @@ private boolean checkNumberOfBucketsAgainstBigTable( protected boolean canConvertMapJoinToBucketMapJoin( MapJoinOperator mapJoinOp, - ParseContext pGraphContext, BucketJoinProcCtx context) throws SemanticException { - QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp); - if (joinCtx == null) { + if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { return false; } List joinAliases = new ArrayList(); - String[] srcs = joinCtx.getBaseSrc(); - String[] left = joinCtx.getLeftAliases(); - List mapAlias = joinCtx.getMapAliases(); + String[] srcs = mapJoinOp.getConf().getBaseSrc(); + String[] left = mapJoinOp.getConf().getLeftAliases(); + List mapAlias = mapJoinOp.getConf().getMapAliases(); String baseBigAlias = null; for (String s : left) { if (s != null) { - String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s); + String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s); if (!joinAliases.contains(subQueryAlias)) { joinAliases.add(subQueryAlias); if (!mapAlias.contains(s)) { @@ -160,7 +157,7 @@ protected boolean canConvertMapJoinToBucketMapJoin( for (String s : srcs) { if (s != null) { - String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s); + String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s); if (!joinAliases.contains(subQueryAlias)) { joinAliases.add(subQueryAlias); if (!mapAlias.contains(s)) { @@ -173,9 +170,8 @@ protected boolean canConvertMapJoinToBucketMapJoin( Map> keysMap = mapJoinOp.getConf().getKeys(); return checkConvertBucketMapJoin( - pGraphContext, context, - joinCtx, + mapJoinOp.getConf().getAliasToOpInfo(), keysMap, baseBigAlias, joinAliases); @@ -190,9 +186,8 @@ protected boolean canConvertMapJoinToBucketMapJoin( * d. The number of buckets in the big table can be divided by no of buckets in small tables. */ protected boolean checkConvertBucketMapJoin( - ParseContext pGraphContext, BucketJoinProcCtx context, - QBJoinTree joinCtx, + Map> aliasToOpInfo, Map> keysMap, String baseBigAlias, List joinAliases) throws SemanticException { @@ -218,7 +213,7 @@ protected boolean checkConvertBucketMapJoin( boolean bigTablePartitioned = true; for (int index = 0; index < joinAliases.size(); index++) { String alias = joinAliases.get(index); - Operator topOp = joinCtx.getAliasToOpInfo().get(alias); + Operator topOp = aliasToOpInfo.get(alias); // The alias may not be present in case of a sub-query if (topOp == null) { return false; @@ -459,7 +454,7 @@ protected void convertMapJoinToBucketMapJoin( } // convert partition to partition spec string - private static Map> convert(Map> mapping) { + private Map> convert(Map> mapping) { Map> converted = new HashMap>(); for (Map.Entry> entry : mapping.entrySet()) { converted.put(entry.getKey().getName(), entry.getValue()); @@ -488,7 +483,7 @@ protected void convertMapJoinToBucketMapJoin( } // called for each partition of big table and populates mapping for each file in the partition - private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( + private void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( List smallTblBucketNums, List> smallTblFilesList, Map> bigTableBucketFileNameToSmallTableBucketFileNames, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index eba35f5..210cb5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -97,17 +96,17 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, return false; } - boolean tableEligibleForBucketedSortMergeJoin = true; - QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() - .get(mapJoinOp); - if (joinCxt == null) { + if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { return false; } - String[] srcs = joinCxt.getBaseSrc(); + + String[] srcs = mapJoinOp.getConf().getBaseSrc(); for (int srcPos = 0; srcPos < srcs.length; srcPos++) { - srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]); + srcs[srcPos] = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), srcs[srcPos]); } + boolean tableEligibleForBucketedSortMergeJoin = true; + // All the tables/partitions columns should be sorted in the same order // For example, if tables A and B are being joined on columns c1, c2 and c3 // which are the sorted and bucketed columns. The join would work, as long @@ -117,9 +116,8 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, for (int pos = 0; pos < srcs.length; pos++) { tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin && isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, mapJoinOp.getConf().getKeys().get((byte) pos), - joinCxt, + mapJoinOp.getConf().getAliasToOpInfo(), srcs, pos, sortColumnsFirstTable); @@ -141,8 +139,7 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, // Convert the bucket map-join operator to a sort-merge map join operator protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) { + SortBucketJoinProcCtx smbJoinContext) { String[] srcs = smbJoinContext.getSrcs(); SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp); @@ -219,10 +216,13 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo child.getParentOperators().remove(index); child.getParentOperators().add(index, smbJop); } - parseContext.getSmbMapJoinContext().put(smbJop, - parseContext.getMapJoinContext().get(mapJoinOp)); - parseContext.getMapJoinContext().remove(mapJoinOp); - parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp)); + + // Data structures coming from QBJoinTree + smbJop.getConf().setQBJoinTreeProps(mapJoinOp.getConf()); + // + pGraphContext.getSmbMapJoinOps().add(smbJop); + pGraphContext.getMapJoinOps().remove(mapJoinOp); + pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp)); return smbJop; } @@ -242,9 +242,8 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo */ private boolean isEligibleForBucketSortMergeJoin( SortBucketJoinProcCtx smbJoinContext, - ParseContext pctx, List keys, - QBJoinTree joinTree, + Map> aliasToOpInfo, String[] aliases, int pos, List sortColumnsFirstTable) throws SemanticException { @@ -266,7 +265,7 @@ private boolean isEligibleForBucketSortMergeJoin( * table. If the object being map-joined is a base table, then aliasToOpInfo * contains the TableScanOperator, and TableAccessAnalyzer is a no-op. */ - Operator topOp = joinTree.getAliasToOpInfo().get(alias); + Operator topOp = aliasToOpInfo.get(alias); if (topOp == null) { return false; } @@ -386,15 +385,13 @@ private boolean checkSortColsAndJoinCols(List sortCols, // It is already verified that the join can be converted to a bucket map join protected boolean checkConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { - - QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { - if (joinCtx == null) { + if (!this.pGraphContext.getJoinOps().contains(joinOperator)) { return false; } - String[] srcs = joinCtx.getBaseSrc(); + + String[] srcs = joinOperator.getConf().getBaseSrc(); // All the tables/partitions columns should be sorted in the same order // For example, if tables A and B are being joined on columns c1, c2 and c3 @@ -404,9 +401,8 @@ protected boolean checkConvertJoinToSMBJoin( for (int pos = 0; pos < srcs.length; pos++) { if (!isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, smbJoinContext.getKeyExprMap().get((byte) pos), - joinCtx, + joinOperator.getConf().getAliasToOpInfo(), srcs, pos, sortColumnsFirstTable)) { @@ -421,12 +417,10 @@ protected boolean checkConvertJoinToSMBJoin( // Can the join operator be converted to a sort-merge join operator ? protected boolean canConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { boolean canConvert = canConvertJoinToBucketMapJoin( joinOperator, - pGraphContext, smbJoinContext ); @@ -434,13 +428,12 @@ protected boolean canConvertJoinToSMBJoin( return false; } - return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext); + return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext); } // Can the join operator be converted to a bucket map-merge join operator ? protected boolean canConvertJoinToBucketMapJoin( JoinOperator joinOp, - ParseContext pGraphContext, SortBucketJoinProcCtx context) throws SemanticException { // This has already been inspected and rejected @@ -448,8 +441,7 @@ protected boolean canConvertJoinToBucketMapJoin( return false; } - QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOp); - if (joinCtx == null) { + if (!this.pGraphContext.getJoinOps().contains(joinOp)) { return false; } @@ -482,8 +474,9 @@ protected boolean canConvertJoinToBucketMapJoin( context.setBigTablePosition(bigTablePosition); String joinAlias = bigTablePosition == 0 ? - joinCtx.getLeftAlias() : joinCtx.getRightAliases()[bigTablePosition - 1]; - joinAlias = QB.getAppendedAliasFromId(joinCtx.getId(), joinAlias); + joinOp.getConf().getLeftAlias() : + joinOp.getConf().getRightAliases()[bigTablePosition - 1]; + joinAlias = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinAlias); Map> keyExprMap = new HashMap>(); List> parentOps = joinOp.getParentOperators(); @@ -497,10 +490,10 @@ protected boolean canConvertJoinToBucketMapJoin( context.setKeyExprMap(keyExprMap); // Make a deep copy of the aliases so that they are not changed in the context - String[] joinSrcs = joinCtx.getBaseSrc(); + String[] joinSrcs = joinOp.getConf().getBaseSrc(); String[] srcs = new String[joinSrcs.length]; for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) { - joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]); + joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinSrcs[srcPos]); srcs[srcPos] = new String(joinSrcs[srcPos]); } @@ -508,9 +501,8 @@ protected boolean canConvertJoinToBucketMapJoin( // The candidate map-join was derived from the pluggable sort merge join big // table matcher. return checkConvertBucketMapJoin( - pGraphContext, context, - joinCtx, + joinOp.getConf().getAliasToOpInfo(), keyExprMap, joinAlias, Arrays.asList(srcs)); @@ -519,19 +511,23 @@ protected boolean canConvertJoinToBucketMapJoin( // Convert the join operator to a bucket map-join join operator protected MapJoinOperator convertJoinToBucketMapJoin( JoinOperator joinOp, - SortBucketJoinProcCtx joinContext, - ParseContext parseContext) throws SemanticException { + SortBucketJoinProcCtx joinContext) throws SemanticException { MapJoinOperator mapJoinOp = new MapJoinProcessor().convertMapJoin( - parseContext.getConf(), - parseContext.getOpParseCtx(), + pGraphContext.getConf(), + pGraphContext.getOpParseCtx(), joinOp, - pGraphContext.getJoinContext().get(joinOp), + joinOp.getConf().isJoinSrc(), + joinOp.getConf().getBaseSrc(), + joinOp.getConf().getMapAliases(), joinContext.getBigTablePosition(), false, false); // Remove the join operator from the query join context - parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp)); - parseContext.getJoinContext().remove(joinOp); + // Data structures coming from QBJoinTree + mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf()); + // + pGraphContext.getMapJoinOps().add(mapJoinOp); + pGraphContext.getJoinOps().remove(joinOp); convertMapJoinToBucketMapJoin(mapJoinOp, joinContext); return mapJoinOp; } @@ -539,11 +535,10 @@ protected MapJoinOperator convertJoinToBucketMapJoin( // Convert the join operator to a sort-merge join operator protected void convertJoinToSMBJoin( JoinOperator joinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) throws SemanticException { - MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { + MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext); SMBMapJoinOperator smbMapJoinOp = - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); smbMapJoinOp.setConvertedAutomaticallySMBJoin(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java index 264d3f0..1b7c500 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java @@ -45,7 +45,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // can the mapjoin present be converted to a bucketed mapjoin boolean convert = canConvertMapJoinToBucketMapJoin( - mapJoinOperator, pGraphContext, context); + mapJoinOperator, context); HiveConf conf = context.getConf(); // Throw an error if the user asked for bucketed mapjoin to be enforced and @@ -67,13 +67,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, * and do the version if possible. */ public static void checkAndConvertBucketMapJoin(ParseContext pGraphContext, - MapJoinOperator mapJoinOp, QBJoinTree joinCtx, String baseBigAlias, + MapJoinOperator mapJoinOp, String baseBigAlias, List joinAliases) throws SemanticException { BucketJoinProcCtx ctx = new BucketJoinProcCtx(pGraphContext.getConf()); BucketMapjoinProc proc = new BucketMapjoinProc(pGraphContext); Map> keysMap = mapJoinOp.getConf().getKeys(); - if (proc.checkConvertBucketMapJoin(pGraphContext, ctx, - joinCtx, keysMap, baseBigAlias, joinAliases)) { + if (proc.checkConvertBucketMapJoin(ctx, mapJoinOp.getConf().getAliasToOpInfo(), + keysMap, baseBigAlias, joinAliases)) { proc.convertMapJoinToBucketMapJoin(mapJoinOp, ctx); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 7ab35ee..09fba7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -47,8 +47,8 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; @@ -231,13 +231,16 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont ParseContext parseContext = context.parseContext; MapJoinDesc mapJoinDesc = null; if (adjustParentsChildren) { - mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), - joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true); + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), + joinOp, joinOp.getConf().isJoinSrc(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), + mapJoinConversionPos, true); } else { JoinDesc joinDesc = joinOp.getConf(); // retain the original join desc in the map join. mapJoinDesc = - new MapJoinDesc(MapJoinProcessor.getKeys(parseContext.getJoinContext().get(joinOp), joinOp).getSecond(), + new MapJoinDesc( + MapJoinProcessor.getKeys(joinOp.getConf().isJoinSrc(), + joinOp.getConf().getBaseSrc(), joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); @@ -606,7 +609,8 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo ParseContext parseContext = context.parseContext; MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, - parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + joinOp.getConf().isJoinSrc(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), + bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index ae0addc..3ed4716 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -18,7 +18,20 @@ package org.apache.hadoop.hive.ql.optimizer; -import com.google.common.collect.Interner; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -67,7 +80,6 @@ import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.QBParseInfo; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -102,19 +114,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; +import com.google.common.collect.Interner; /** * General utility common functions for the Processor to convert operator into @@ -1067,17 +1067,23 @@ private static void splitTasks(ReduceSinkOperator op, if (needsTagging(cplan.getReduceWork())) { Operator reducerOp = cplan.getReduceWork().getReducer(); - QBJoinTree joinTree = null; + String id = null; if (reducerOp instanceof JoinOperator) { - joinTree = parseCtx.getJoinContext().get(reducerOp); + if (parseCtx.getJoinOps().contains(reducerOp)) { + id = ((JoinOperator)reducerOp).getConf().getId(); + } } else if (reducerOp instanceof MapJoinOperator) { - joinTree = parseCtx.getMapJoinContext().get(reducerOp); + if (parseCtx.getMapJoinOps().contains(reducerOp)) { + id = ((MapJoinOperator)reducerOp).getConf().getId(); + } } else if (reducerOp instanceof SMBMapJoinOperator) { - joinTree = parseCtx.getSmbMapJoinContext().get(reducerOp); + if (parseCtx.getSmbMapJoinOps().contains(reducerOp)) { + id = ((SMBMapJoinOperator)reducerOp).getConf().getId(); + } } - if (joinTree != null && joinTree.getId() != null) { - streamDesc = joinTree.getId() + ":$INTNAME"; + if (id != null) { + streamDesc = id + ":$INTNAME"; } else { streamDesc = "$INTNAME"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 9238e0e..065edef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -95,9 +94,9 @@ private int getOutputSize(Operator operator, private Set getBigTables(ParseContext joinCtx) { Set bigTables = new HashSet(); - for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) { - if (qbJoin.getStreamAliases() != null) { - bigTables.addAll(qbJoin.getStreamAliases()); + for (JoinOperator joinOp : joinCtx.getJoinOps()) { + if (joinOp.getConf().getStreamAliases() != null) { + bigTables.addAll(joinOp.getConf().getStreamAliases()); } } @@ -155,7 +154,7 @@ private void reorder(JoinOperator joinOp, Set bigTables) { public ParseContext transform(ParseContext pactx) throws SemanticException { Set bigTables = getBigTables(pactx); - for (JoinOperator joinOp : pactx.getJoinContext().keySet()) { + for (JoinOperator joinOp : pactx.getJoinOps()) { reorder(joinOp, bigTables); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 828f87c..8ae380d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -96,18 +95,13 @@ // needs to be passed. Use the string defined below for that. private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey"; - private ParseContext pGraphContext; - - /** - * empty constructor. - */ public MapJoinProcessor() { - pGraphContext = null; } @SuppressWarnings("nls") - private Operator - putOpInsertMap(Operator op, RowResolver rr) { + private static Operator putOpInsertMap ( + ParseContext pGraphContext, Operator op, + RowResolver rr) { OpParseContext ctx = new OpParseContext(rr); pGraphContext.getOpParseCtx().put(op, ctx); return op; @@ -232,10 +226,10 @@ public static void genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, throws SemanticException { LinkedHashMap, OpParseContext> opParseCtxMap = newWork.getMapWork().getOpParseCtxMap(); - QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree(); // generate the map join operator; already checked the map join MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, opParseCtxMap, op, - newJoinTree, mapJoinPos, true, false); + newWork.getMapWork().isJoinSrc(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(), + mapJoinPos, true, false); genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); } @@ -247,7 +241,9 @@ public static void genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator ne MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos); // clean up the mapred work newWork.getMapWork().setOpParseCtxMap(null); - newWork.getMapWork().setJoinTree(null); + newWork.getMapWork().setJoinSrc(false); + newWork.getMapWork().setBaseSrc(null); + newWork.getMapWork().setMapAliases(null); } catch (Exception e) { e.printStackTrace(); @@ -307,9 +303,8 @@ private static void validateMapJoinTypes(Operator op) */ public MapJoinOperator convertMapJoin(HiveConf conf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin, - boolean validateMapJoinTree) - throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException { // outer join cannot be performed on a table which is being cached JoinDesc desc = op.getConf(); @@ -324,8 +319,6 @@ public MapJoinOperator convertMapJoin(HiveConf conf, // Walk over all the sources (which are guaranteed to be reduce sink // operators). // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); - List> parentOps = op.getParentOperators(); List> newParentOps = new ArrayList>(); @@ -333,7 +326,7 @@ public MapJoinOperator convertMapJoin(HiveConf conf, new ArrayList>(); // found a source which is not to be stored in memory - if (leftSrc != null) { + if (leftSrc) { // assert mapJoinPos == 0; Operator parentOp = parentOps.get(0); assert parentOp.getParentOperators().size() == 1; @@ -345,7 +338,7 @@ public MapJoinOperator convertMapJoin(HiveConf conf, byte pos = 0; // Remove parent reduce-sink operators - for (String src : joinTree.getBaseSrc()) { + for (String src : baseSrc) { if (src != null) { Operator parentOp = parentOps.get(pos); assert parentOp.getParentOperators().size() == 1; @@ -360,7 +353,7 @@ public MapJoinOperator convertMapJoin(HiveConf conf, // create the map-join operator MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, - op, joinTree, mapJoinPos, noCheckOuterJoin); + op, leftSrc, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin); // remove old parents @@ -384,11 +377,12 @@ public MapJoinOperator convertMapJoin(HiveConf conf, public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) - throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { MapJoinDesc mapJoinDescriptor = - getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin); + getMapJoinDesc(hconf, opParseCtxMap, op, leftSrc, baseSrc, mapAliases, + mapJoinPos, noCheckOuterJoin); // reduce sink row resolver used to generate map join op RowResolver outputRS = opParseCtxMap.get(op).getRowResolver(); @@ -441,7 +435,7 @@ private static boolean needValueIndex(int[] valueIndex) { */ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, Map, OpParseContext> opParseCtxMap, - SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin) + SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin) throws SemanticException { // Create a new map join operator SMBJoinDesc smbJoinDesc = smbJoinOp.getConf(); @@ -488,7 +482,7 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, } public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op, - QBJoinTree joinTree, int mapJoinPos) throws SemanticException { + int mapJoinPos) throws SemanticException { HiveConf hiveConf = pctx.getConf(); boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) @@ -497,7 +491,8 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o LinkedHashMap, OpParseContext> opParseCtxMap = pctx .getOpParseCtx(); MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op, - joinTree, mapJoinPos, noCheckOuterJoin, true); + op.getConf().isJoinSrc(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(), + mapJoinPos, noCheckOuterJoin, true); // create a dummy select to select all columns genSelectPlan(pctx, mapJoinOp); return mapJoinOp; @@ -626,7 +621,7 @@ protected void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Se SelectDesc select = new SelectDesc(exprs, outputs, false); - SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select, + SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR.getColumnInfos()), input), inputRR); sel.setColumnExprMap(colExprMap); @@ -643,24 +638,22 @@ protected void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Se * * @param op * join operator - * @param joinTree - * qb join tree * @return -1 if it cannot be converted to a map-side join, position of the map join node * otherwise */ - private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException { + private int mapSideJoin(JoinOperator op) throws SemanticException { int mapJoinPos = -1; - if (joinTree.isMapSideJoin()) { + if (op.getConf().isMapSideJoin()) { int pos = 0; // In a map-side join, exactly one table is not present in memory. // The client provides the list of tables which can be cached in memory // via a hint. - if (joinTree.getJoinSrc() != null) { + if (op.getConf().isJoinSrc()) { mapJoinPos = pos; } - for (String src : joinTree.getBaseSrc()) { + for (String src : op.getConf().getBaseSrc()) { if (src != null) { - if (!joinTree.getMapAliases().contains(src)) { + if (!op.getConf().getMapAliases().contains(src)) { if (mapJoinPos >= 0) { return -1; } @@ -675,7 +668,7 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc // leaving some table from the list of tables to be cached if (mapJoinPos == -1) { throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg( - Arrays.toString(joinTree.getBaseSrc()))); + Arrays.toString(op.getConf().getBaseSrc()))); } } @@ -691,36 +684,34 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc */ @Override public ParseContext transform(ParseContext pactx) throws SemanticException { - pGraphContext = pactx; List listMapJoinOps = new ArrayList(); // traverse all the joins and convert them if necessary - if (pGraphContext.getJoinContext() != null) { - Map joinMap = new HashMap(); - Map mapJoinMap = pGraphContext.getMapJoinContext(); + if (pactx.getJoinOps() != null) { + Set joinMap = new HashSet(); + Set mapJoinMap = pactx.getMapJoinOps(); if (mapJoinMap == null) { - mapJoinMap = new HashMap(); - pGraphContext.setMapJoinContext(mapJoinMap); + mapJoinMap = new HashSet(); + pactx.setMapJoinOps(mapJoinMap); } - Set> joinCtx = pGraphContext.getJoinContext().entrySet(); - Iterator> joinCtxIter = joinCtx.iterator(); + Iterator joinCtxIter = pactx.getJoinOps().iterator(); while (joinCtxIter.hasNext()) { - Map.Entry joinEntry = joinCtxIter.next(); - JoinOperator joinOp = joinEntry.getKey(); - QBJoinTree qbJoin = joinEntry.getValue(); - int mapJoinPos = mapSideJoin(joinOp, qbJoin); + JoinOperator joinOp = joinCtxIter.next(); + int mapJoinPos = mapSideJoin(joinOp); if (mapJoinPos >= 0) { - MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos); + MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, mapJoinPos); listMapJoinOps.add(mapJoinOp); - mapJoinMap.put(mapJoinOp, qbJoin); + mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf()); + mapJoinMap.add(mapJoinOp); } else { - joinMap.put(joinOp, qbJoin); + joinOp.getConf().setQBJoinTreeProps(joinOp.getConf()); + joinMap.add(joinOp); } } // store the new joinContext - pGraphContext.setJoinContext(joinMap); + pactx.setJoinOps(joinMap); } // Go over the list and find if a reducer is not needed @@ -746,15 +737,15 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx( - listMapJoinOpsNoRed, pGraphContext)); + listMapJoinOpsNoRed, pactx)); GraphWalker ogw = new GenMapRedWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.addAll(listMapJoinOps); ogw.startWalking(topNodes, null); - pGraphContext.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); - return pGraphContext; + pactx.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); + return pactx; } /** @@ -800,7 +791,7 @@ private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOpera } Operator ch = parent.getChildOperators().get(0); if (ch instanceof MapJoinOperator) { - if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) { + if (!nonSubqueryMapJoin((MapJoinOperator) ch, mapJoin)) { if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf() .getPosBigTable()) { // not come from the local branch @@ -820,11 +811,8 @@ private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOpera } } - private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin, - MapJoinOperator parentMapJoin) { - QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin); - QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin); - if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) { + private boolean nonSubqueryMapJoin(MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) { + if (mapJoin.getParentOperators().contains(parentMapJoin)) { return true; } return false; @@ -1028,15 +1016,17 @@ public void setpGraphContext(ParseContext pGraphContext) { } - public static ObjectPair, Map>> getKeys(QBJoinTree joinTree, JoinOperator op) { + public static ObjectPair, Map>> getKeys( + boolean leftSrc, + String[] baseSrc, + JoinOperator op) { // Walk over all the sources (which are guaranteed to be reduce sink // operators). // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); List oldReduceSinkParentOps = new ArrayList(op.getNumParent()); - if (leftSrc != null) { + if (leftSrc) { // assert mapJoinPos == 0; Operator parentOp = op.getParentOperators().get(0); assert parentOp.getParentOperators().size() == 1; @@ -1044,7 +1034,7 @@ public void setpGraphContext(ParseContext pGraphContext) { } byte pos = 0; - for (String src : joinTree.getBaseSrc()) { + for (String src : baseSrc) { if (src != null) { Operator parentOp = op.getParentOperators().get(pos); assert parentOp.getParentOperators().size() == 1; @@ -1067,7 +1057,8 @@ public void setpGraphContext(ParseContext pGraphContext) { public static MapJoinDesc getMapJoinDesc(HiveConf hconf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { JoinDesc desc = op.getConf(); JoinCondDesc[] condns = desc.getConds(); Byte[] tagOrder = desc.getTagOrder(); @@ -1084,7 +1075,8 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, Map> valueExprs = op.getConf().getExprs(); Map> newValueExprs = new HashMap>(); - ObjectPair, Map>> pair = getKeys(joinTree, op); + ObjectPair, Map>> pair = + getKeys(leftSrc, baseSrc, op); List oldReduceSinkParentOps = pair.getFirst(); for (Map.Entry> entry : valueExprs.entrySet()) { byte tag = entry.getKey(); @@ -1174,8 +1166,8 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, // create dumpfile prefix needed to create descriptor String dumpFilePrefix = ""; - if (joinTree.getMapAliases() != null) { - for (String mapAlias : joinTree.getMapAliases()) { + if (mapAliases != null) { + for (String mapAlias : mapAliases) { dumpFilePrefix = dumpFilePrefix + mapAlias; } dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index 5291851..175a53c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -55,15 +55,12 @@ */ public class NonBlockingOpDeDupProc implements Transform { - private ParseContext pctx; - @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - this.pctx = pctx; String SEL = SelectOperator.getOperatorName(); String FIL = FilterOperator.getOperatorName(); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup()); + opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx)); opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); @@ -76,6 +73,13 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } private class SelectDedup implements NodeProcessor { + + private ParseContext pctx; + + public SelectDedup (ParseContext pctx) { + this.pctx = pctx; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -178,28 +182,37 @@ private boolean checkReferences(ExprNodeDesc expr, Set funcOutputs, Set< } return true; } - } - /** - * Change existing references in the context to point from child to parent operator. - * @param cSEL child operator (to be removed, and merged into parent) - * @param pSEL parent operator - */ - private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { - Collection qbJoinTrees = new ArrayList(); - qbJoinTrees.addAll(pctx.getJoinContext().values()); - qbJoinTrees.addAll(pctx.getMapJoinContext().values()); - for (QBJoinTree qbJoinTree : qbJoinTrees) { - Map> aliasToOpInfo = qbJoinTree.getAliasToOpInfo(); - for (Map.Entry> entry : aliasToOpInfo.entrySet()) { - if (entry.getValue() == cSEL) { - aliasToOpInfo.put(entry.getKey(), pSEL); + /** + * Change existing references in the context to point from child to parent operator. + * @param cSEL child operator (to be removed, and merged into parent) + * @param pSEL parent operator + */ + private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { + Collection>> mapsAliasToOpInfo = + new ArrayList>>(); + for (JoinOperator joinOp : pctx.getJoinOps()) { + if (joinOp.getConf().getAliasToOpInfo() != null) { + mapsAliasToOpInfo.add(joinOp.getConf().getAliasToOpInfo()); + } + } + for (MapJoinOperator mapJoinOp : pctx.getMapJoinOps()) { + if (mapJoinOp.getConf().getAliasToOpInfo() != null) { + mapsAliasToOpInfo.add(mapJoinOp.getConf().getAliasToOpInfo()); + } + } + for (Map> aliasToOpInfo : mapsAliasToOpInfo) { + for (Map.Entry> entry : aliasToOpInfo.entrySet()) { + if (entry.getValue() == cSEL) { + aliasToOpInfo.put(entry.getKey(), pSEL); + } } } } } private class FilterDedup implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index ea06503..1609831 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -79,11 +79,13 @@ public class SkewJoinOptimizer implements Transform { private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName()); - private static ParseContext parseContext; public static class SkewJoinProc implements NodeProcessor { - public SkewJoinProc() { + private ParseContext parseContext; + + public SkewJoinProc(ParseContext parseContext) { super(); + this.parseContext = parseContext; } @Override @@ -165,23 +167,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } - // have to create a QBJoinTree for the cloned join operator - QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp); - QBJoinTree newJoinTree; - try { - newJoinTree = originJoinTree.clone(); - } catch (CloneNotSupportedException e) { - LOG.debug("QBJoinTree could not be cloned: ", e); - return null; - } - JoinOperator joinOpClone; if (processSelect) { joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0)); } else { joinOpClone = (JoinOperator)currOpClone; } - parseContext.getJoinContext().put(joinOpClone, newJoinTree); + joinOpClone.getConf().cloneQBJoinTreeProps(joinOp.getConf()); + parseContext.getJoinOps().add(joinOpClone); List tableScanCloneOpsForJoin = new ArrayList(); @@ -213,7 +206,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } parseContext.getTopOps().put(newAlias, tso); - setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso); + setUpAlias(joinOp, joinOpClone, tabAlias, newAlias, tso); } // Now do a union of the select operators: selectOp and selectOpClone @@ -629,19 +622,19 @@ private void insertRowResolvers( /** * Set alias in the cloned join tree */ - private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias, + private static void setUpAlias(JoinOperator origin, JoinOperator cloned, String origAlias, String newAlias, Operator topOp) { - cloned.getAliasToOpInfo().remove(origAlias); - cloned.getAliasToOpInfo().put(newAlias, topOp); - if (origin.getLeftAlias().equals(origAlias)) { - cloned.setLeftAlias(null); - cloned.setLeftAlias(newAlias); + cloned.getConf().getAliasToOpInfo().remove(origAlias); + cloned.getConf().getAliasToOpInfo().put(newAlias, topOp); + if (origin.getConf().getLeftAlias().equals(origAlias)) { + cloned.getConf().setLeftAlias(null); + cloned.getConf().setLeftAlias(newAlias); } - replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias); - replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias); - replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias); - replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias); - replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getLeftAliases(), cloned.getConf().getLeftAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getRightAliases(), cloned.getConf().getRightAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getBaseSrc(), cloned.getConf().getBaseSrc(), origAlias, newAlias); + replaceAlias(origin.getConf().getMapAliases(), cloned.getConf().getMapAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getStreamAliases(), cloned.getConf().getStreamAliases(), origAlias, newAlias); } private static void replaceAlias(String[] origin, String[] cloned, @@ -677,7 +670,7 @@ private static void replaceAlias(List origin, List cloned, public ParseContext transform(ParseContext pctx) throws SemanticException { Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc()); + opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc(pctx)); SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -692,8 +685,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } - private NodeProcessor getSkewJoinProc() { - return new SkewJoinProc(); + private NodeProcessor getSkewJoinProc(ParseContext parseContext) { + return new SkewJoinProc(parseContext); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java index 11ce47e..f6ca039 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java @@ -60,7 +60,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } if (convert) { - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java index 8a0c474..d090598 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java @@ -44,10 +44,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx; boolean convert = canConvertJoinToSMBJoin( - joinOp, smbJoinContext, pGraphContext); + joinOp, smbJoinContext); if (convert) { - convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext); + convertJoinToSMBJoin(joinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java index bed95fa..6c014c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java @@ -21,18 +21,18 @@ import java.util.LinkedHashMap; import java.util.List; -import com.google.common.base.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.parse.OpParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.base.Preconditions; + public class SparkMapJoinProcessor extends MapJoinProcessor { /** @@ -50,8 +50,8 @@ @Override public MapJoinOperator convertMapJoin(HiveConf conf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int bigTablePos, - boolean noCheckOuterJoin, + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int bigTablePos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException { // outer join cannot be performed on a table which is being cached @@ -65,7 +65,8 @@ public MapJoinOperator convertMapJoin(HiveConf conf, // create the map-join operator MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, - op, joinTree, bigTablePos, noCheckOuterJoin); + op, op.getConf().isJoinSrc(), op.getConf().getBaseSrc(), + op.getConf().getMapAliases(), bigTablePos, noCheckOuterJoin); // 1. remove RS as parent for the big table branch // 2. remove old join op from child set of all the RSs diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index c52f753..d4de0bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -107,7 +107,7 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { // that has both intermediate tables and query input tables as input tables, // we should be able to guess if this JoinOperator will be converted to a MapJoin // based on hive.auto.convert.join.noconditionaltask.size. - for (JoinOperator joinOp: pCtx.getJoinContext().keySet()) { + for (JoinOperator joinOp: pCtx.getJoinOps()) { boolean isAbleToGuess = true; boolean mayConvert = false; // Get total size and individual alias's size diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 33ef581..a3a7f42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MapWork; /** @@ -53,8 +52,7 @@ public AbstractJoinTaskDispatcher(PhysicalContext context) { throws SemanticException; protected void replaceTaskWithConditionalTask( - Task currTask, ConditionalTask cndTsk, - PhysicalContext physicalContext) { + Task currTask, ConditionalTask cndTsk) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); @@ -88,8 +86,7 @@ protected void replaceTaskWithConditionalTask( // Replace the task with the new task. Copy the children and parents of the old // task to the new task. protected void replaceTask( - Task currTask, Task newTask, - PhysicalContext physicalContext) { + Task currTask, Task newTask) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 9c26907..356c3bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; @@ -400,7 +399,6 @@ public static boolean cannotConvert(long aliasKnownSize, // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp); // start to generate multiple map join tasks JoinDesc joinDesc = joinOp.getConf(); @@ -458,7 +456,9 @@ public static boolean cannotConvert(long aliasKnownSize, } currWork.setOpParseCtxMap(parseCtx.getOpParseCtx()); - currWork.setJoinTree(joinTree); + currWork.setJoinSrc(joinOp.getConf().isJoinSrc()); + currWork.setBaseSrc(joinOp.getConf().getBaseSrc()); + currWork.setMapAliases(joinOp.getConf().getMapAliases()); if (bigTablePosition >= 0) { // create map join task and set big table as bigTablePosition @@ -466,7 +466,7 @@ public static boolean cannotConvert(long aliasKnownSize, newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP); newTask.setFetchSource(currTask.isFetchSource()); - replaceTask(currTask, newTask, physicalContext); + replaceTask(currTask, newTask); // Can this task be merged with the child task. This can happen if a big table is being // joined with multiple small tables on different keys @@ -522,7 +522,9 @@ public static boolean cannotConvert(long aliasKnownSize, listTasks.add(currTask); // clear JoinTree and OP Parse Context currWork.setOpParseCtxMap(null); - currWork.setJoinTree(null); + currWork.setJoinSrc(false); + currWork.setBaseSrc(null); + currWork.setMapAliases(null); // create conditional task and insert conditional task into task tree ConditionalWork cndWork = new ConditionalWork(listWorks); @@ -541,7 +543,7 @@ public static boolean cannotConvert(long aliasKnownSize, cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 6f92b13..150b004 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; @@ -168,8 +167,7 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera // create map join task and set big table as bigTablePosition private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork, int bigTablePosition, - SMBMapJoinOperator smbJoinOp, - QBJoinTree joinTree) + SMBMapJoinOperator smbJoinOp) throws UnsupportedEncodingException, SemanticException { // deep copy a new mapred work MapredWork newWork = Utilities.clonePlan(origWork); @@ -178,7 +176,7 @@ private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork, .getParseContext().getConf()); // generate the map join operator; already checked the map join MapJoinOperator newMapJoinOp = - getMapJoinOperator(newTask, newWork, smbJoinOp, joinTree, bigTablePosition); + getMapJoinOperator(newTask, newWork, smbJoinOp, bigTablePosition); // The reducer needs to be restored - Consider a query like: // select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key; @@ -246,7 +244,6 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getSmbMapJoinContext().get(originalSMBJoinOp); // Convert the work containing to sort-merge join into a work, as if it had a regular join. // Note that the operator tree is not changed - is still contains the SMB join, but the @@ -257,9 +254,13 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork); currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx()); - currWork.getMapWork().setJoinTree(joinTree); + currWork.getMapWork().setJoinSrc(originalSMBJoinOp.getConf().isJoinSrc()); + currWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc()); + currWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases()); currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx()); - currJoinWork.getMapWork().setJoinTree(joinTree); + currJoinWork.getMapWork().setJoinSrc(originalSMBJoinOp.getConf().isJoinSrc()); + currJoinWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc()); + currJoinWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases()); // create conditional work list and task list List listWorks = new ArrayList(); @@ -296,7 +297,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // create map join task for the given big table position MapRedTask newTask = convertSMBTaskToMapJoinTask( - currJoinWork, bigTablePosition, newSMBJoinOp, joinTree); + currJoinWork, bigTablePosition, newSMBJoinOp); MapWork mapWork = newTask.getWork().getMapWork(); Operator parentOp = originalSMBJoinOp.getParentOperators().get(bigTablePosition); @@ -334,7 +335,9 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) listTasks.add(currTask); // clear JoinTree and OP Parse Context currWork.getMapWork().setOpParseCtxMap(null); - currWork.getMapWork().setJoinTree(null); + currWork.getMapWork().setJoinSrc(false); + currWork.getMapWork().setBaseSrc(null); + currWork.getMapWork().setMapAliases(null); // create conditional task and insert conditional task into task tree ConditionalWork cndWork = new ConditionalWork(listWorks); @@ -353,7 +356,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } @@ -426,7 +429,6 @@ private SMBMapJoinOperator getSMBMapJoinOp(MapredWork work) throws SemanticExcep private MapJoinOperator getMapJoinOperator(MapRedTask task, MapredWork work, SMBMapJoinOperator oldSMBJoinOp, - QBJoinTree joinTree, int mapJoinPos) throws SemanticException { SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(task.getWork()); @@ -437,7 +439,6 @@ private MapJoinOperator getMapJoinOperator(MapRedTask task, // generate the map join operator return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(), - opParseContextMap, newSMBJoinOp, - joinTree, mapJoinPos, true); + opParseContextMap, newSMBJoinOp, mapJoinPos, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 0379834..85cdd20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -130,7 +130,6 @@ private void setNumberOfBucketsOnChildren(Operator curre private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoinOp, OptimizeSparkProcContext context, int bigTablePosition) throws SemanticException { ParseContext parseContext = context.getParseContext(); - QBJoinTree joinTree = parseContext.getJoinContext().get(joinOp); List joinAliases = new ArrayList(); String baseBigAlias = null; Map> posToAliasMap = joinOp.getPosToAliasMap(); @@ -146,7 +145,10 @@ private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoi } mapJoinOp.setPosToAliasMap(posToAliasMap); BucketMapjoinProc.checkAndConvertBucketMapJoin( - parseContext, mapJoinOp, joinTree, baseBigAlias, joinAliases); + parseContext, + mapJoinOp, + baseBigAlias, + joinAliases); MapJoinDesc joinDesc = mapJoinOp.getConf(); return joinDesc.isBucketMapJoin() ? joinDesc.getBigTableBucketNumMapping().size() : -1; @@ -374,7 +376,8 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeSparkProc ParseContext parseContext = context.getParseContext(); MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), parseContext.getOpParseCtx(), joinOp, - parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + joinOp.getConf().isJoinSrc(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), + bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java index f62ad6c..f455748 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java @@ -70,7 +70,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (convert) { removeSmallTableReduceSink(mapJoinOp); - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index ffe11a0..fe698ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory; import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -138,16 +137,22 @@ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, String streamDesc = taskTmpDir.toUri().toString(); if (GenMapRedUtils.needsTagging((ReduceWork) childWork)) { Operator childReducer = ((ReduceWork) childWork).getReducer(); - QBJoinTree joinTree = null; + String id = null; if (childReducer instanceof JoinOperator) { - joinTree = parseContext.getJoinContext().get(childReducer); + if (parseContext.getJoinOps().contains(childReducer)) { + id = ((JoinOperator)childReducer).getConf().getId(); + } } else if (childReducer instanceof MapJoinOperator) { - joinTree = parseContext.getMapJoinContext().get(childReducer); + if (parseContext.getMapJoinOps().contains(childReducer)) { + id = ((MapJoinOperator)childReducer).getConf().getId(); + } } else if (childReducer instanceof SMBMapJoinOperator) { - joinTree = parseContext.getSmbMapJoinContext().get(childReducer); + if (parseContext.getSmbMapJoinOps().contains(childReducer)) { + id = ((SMBMapJoinOperator)childReducer).getConf().getId(); + } } - if (joinTree != null && joinTree.getId() != null) { - streamDesc = joinTree.getId() + ":$INTNAME"; + if (id != null) { + streamDesc = id + ":$INTNAME"; } else { streamDesc = "$INTNAME"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java index d00c48d..845fbb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java @@ -65,7 +65,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, joinOp, smbJoinContext, pGraphContext, stack); if (convert) { - return convertJoinToSMBJoinAndReturn(joinOp, smbJoinContext, pGraphContext); + return convertJoinToSMBJoinAndReturn(joinOp, smbJoinContext); } return null; } @@ -76,7 +76,7 @@ protected boolean canConvertJoinToSMBJoin(JoinOperator joinOperator, if (!supportBucketMapJoin(stack)) { return false; } - return canConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext); + return canConvertJoinToSMBJoin(joinOperator, smbJoinContext); } //Preliminary checks. In the MR version of the code, these used to be done via another walk, @@ -102,11 +102,10 @@ private boolean supportBucketMapJoin(Stack stack) { protected SMBMapJoinOperator convertJoinToSMBJoinAndReturn( JoinOperator joinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) throws SemanticException { - MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { + MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext); SMBMapJoinOperator smbMapJoinOp = - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); smbMapJoinOp.setConvertedAutomaticallySMBJoin(true); return smbMapJoinOp; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 8215c26..81a3256 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; @@ -75,9 +74,9 @@ private HashMap> topOps; private HashMap> topSelOps; private LinkedHashMap, OpParseContext> opParseCtx; - private Map joinContext; - private Map mapJoinContext; - private Map smbMapJoinContext; + private Set joinOps; + private Set mapJoinOps; + private Set smbMapJoinOps; private HashMap topToTable; private Map fsopToTable; private List reduceSinkOperatorsAddedByEnforceBucketingSorting; @@ -133,7 +132,7 @@ public ParseContext() { * @param opParseCtx * operator parse context - contains a mapping from operator to * operator parse state (row resolver etc.) - * @param joinContext + * @param joinOps * context needed join processing (map join specifically) * @param topToTable * the top tables being processed @@ -165,8 +164,8 @@ public ParseContext( HashMap> topOps, HashMap> topSelOps, LinkedHashMap, OpParseContext> opParseCtx, - Map joinContext, - Map smbMapJoinContext, + Set joinOps, + Set smbMapJoinOps, HashMap topToTable, HashMap> topToProps, Map fsopToTable, @@ -188,8 +187,8 @@ public ParseContext( this.ast = ast; this.opToPartPruner = opToPartPruner; this.opToPartList = opToPartList; - this.joinContext = joinContext; - this.smbMapJoinContext = smbMapJoinContext; + this.joinOps = joinOps; + this.smbMapJoinOps = smbMapJoinOps; this.topToTable = topToTable; this.fsopToTable = fsopToTable; this.topToProps = topToProps; @@ -476,18 +475,18 @@ public void setUCtx(UnionProcContext uCtx) { } /** - * @return the joinContext + * @return the joinOps */ - public Map getJoinContext() { - return joinContext; + public Set getJoinOps() { + return joinOps; } /** - * @param joinContext - * the joinContext to set + * @param joinOps + * the joinOps to set */ - public void setJoinContext(Map joinContext) { - this.joinContext = joinContext; + public void setJoinOps(Set joinOps) { + this.joinOps = joinOps; } /** @@ -570,20 +569,20 @@ public LineageInfo getLineageInfo() { return lInfo; } - public Map getMapJoinContext() { - return mapJoinContext; + public Set getMapJoinOps() { + return mapJoinOps; } - public void setMapJoinContext(Map mapJoinContext) { - this.mapJoinContext = mapJoinContext; + public void setMapJoinOps(Set mapJoinOps) { + this.mapJoinOps = mapJoinOps; } - public Map getSmbMapJoinContext() { - return smbMapJoinContext; + public Set getSmbMapJoinOps() { + return smbMapJoinOps; } - public void setSmbMapJoinContext(Map smbMapJoinContext) { - this.smbMapJoinContext = smbMapJoinContext; + public void setSmbMapJoinOps(Set smbMapJoinOps) { + this.smbMapJoinOps = smbMapJoinOps; } public GlobalLimitCtx getGlobalLimitCtx() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cea86df..216dc85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -378,8 +378,6 @@ public void initParseCtx(ParseContext pctx) { opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); - joinContext = pctx.getJoinContext(); - smbMapJoinContext = pctx.getSmbMapJoinContext(); ctx = pctx.getContext(); destTableId = pctx.getDestTableId(); idToTableNameMap = pctx.getIdToTableNameMap(); @@ -394,7 +392,10 @@ public void initParseCtx(ParseContext pctx) { public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, - topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topSelOps, opParseCtx, + new HashSet(joinContext.keySet()), + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, @@ -7341,6 +7342,7 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree, joinSrcOp, srcOps, omitOpts, joinKeys); + joinOp.getConf().setQBJoinTreeProps(joinTree); joinContext.put(joinOp, joinTree); Operator op = joinOp; @@ -9965,7 +9967,10 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce // 4. Generate Parse Context for Optimizer & Physical compiler ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, opToPartPruner, opToPartList, - topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topOps, topSelOps, opParseCtx, + new HashSet(joinContext.keySet()), + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java index da14ab4..b198db0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -174,10 +173,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Get the key column names for each side of the join, // and check if the keys are all constants // or columns (not expressions). If yes, proceed. - QBJoinTree joinTree = pGraphContext.getJoinContext().get(op); - assert(parentOps.size() == joinTree.getBaseSrc().length); + assert(parentOps.size() == op.getConf().getBaseSrc().length); int pos = 0; - for (String src : joinTree.getBaseSrc()) { + for (String src : op.getConf().getBaseSrc()) { if (src != null) { assert(parentOps.get(pos) instanceof ReduceSinkOperator); ReduceSinkOperator reduceSinkOp = (ReduceSinkOperator) parentOps.get(pos); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 23fbbe1..814a9f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Set; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -60,6 +58,9 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; + /** * TaskCompiler is a the base class for classes that compile * operator pipelines into tasks. @@ -386,8 +387,8 @@ public ParseContext getParseContext(ParseContext pCtx, List mapAliases; //map-side join aliases + private transient Map> aliasToOpInfo; + private transient boolean joinSrc; + private transient List streamAliases; + public JoinDesc() { } public JoinDesc(final Map> exprs, List outputColumnNames, final boolean noOuterJoin, - final JoinCondDesc[] conds, final Map> filters, ExprNodeDesc[][] joinKeys) { + final JoinCondDesc[] conds, final Map> filters, + ExprNodeDesc[][] joinKeys) { this.exprs = exprs; this.outputColumnNames = outputColumnNames; this.noOuterJoin = noOuterJoin; @@ -509,4 +525,88 @@ public boolean isFixedAsSorted() { public void setFixedAsSorted(boolean fixedAsSorted) { this.fixedAsSorted = fixedAsSorted; } + + public String[] getLeftAliases() { + return leftAliases; + } + + public String[] getBaseSrc() { + return baseSrc; + } + + public String getId() { + return id; + } + + public List getMapAliases() { + return mapAliases; + } + + public Map> getAliasToOpInfo() { + return aliasToOpInfo; + } + + public boolean isJoinSrc() { + return joinSrc; + } + + public String getLeftAlias() { + return leftAlias; + } + + public void setLeftAlias(String leftAlias) { + this.leftAlias = leftAlias; + } + + public String[] getRightAliases() { + return rightAliases; + } + + public List getStreamAliases() { + return streamAliases; + } + + public boolean isMapSideJoin() { + return mapSideJoin; + } + + public void setQBJoinTreeProps(JoinDesc joinDesc) { + leftAlias = joinDesc.leftAlias; + leftAliases = joinDesc.leftAliases; + rightAliases = joinDesc.rightAliases; + baseSrc = joinDesc.baseSrc; + id = joinDesc.id; + mapSideJoin = joinDesc.mapSideJoin; + mapAliases = joinDesc.mapAliases; + aliasToOpInfo = joinDesc.aliasToOpInfo; + joinSrc = joinDesc.joinSrc; + streamAliases = joinDesc.streamAliases; + } + + public void setQBJoinTreeProps(QBJoinTree joinDesc) { + leftAlias = joinDesc.getLeftAlias(); + leftAliases = joinDesc.getLeftAliases(); + rightAliases = joinDesc.getRightAliases(); + baseSrc = joinDesc.getBaseSrc(); + id = joinDesc.getId(); + mapSideJoin = joinDesc.isMapSideJoin(); + mapAliases = joinDesc.getMapAliases(); + aliasToOpInfo = joinDesc.getAliasToOpInfo(); + joinSrc = joinDesc.getJoinSrc() != null; + streamAliases = joinDesc.getStreamAliases(); + } + + public void cloneQBJoinTreeProps(JoinDesc joinDesc) { + leftAlias = joinDesc.leftAlias; + leftAliases = joinDesc.leftAliases == null ? null : joinDesc.leftAliases.clone(); + rightAliases = joinDesc.rightAliases == null ? null : joinDesc.rightAliases.clone(); + baseSrc = joinDesc.baseSrc == null ? null : joinDesc.baseSrc.clone(); + id = joinDesc.id; + mapSideJoin = joinDesc.mapSideJoin; + mapAliases = joinDesc.mapAliases == null ? null : new ArrayList(joinDesc.mapAliases); + aliasToOpInfo = new HashMap>(joinDesc.aliasToOpInfo); + joinSrc = joinDesc.joinSrc; + streamAliases = joinDesc.streamAliases == null ? null : new ArrayList(joinDesc.streamAliases); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 9f8c091..a15e23f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -30,7 +30,6 @@ import java.util.Map.Entry; import java.util.Set; -import com.google.common.collect.Interner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -41,10 +40,11 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.OpParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.mapred.JobConf; +import com.google.common.collect.Interner; + /** * MapWork represents all the information used to run a map task on the cluster. * It is first used when the query planner breaks the logical plan into tasks and @@ -105,8 +105,10 @@ public static final int SAMPLING_ON_START = 2; // sampling on task running // the following two are used for join processing - private QBJoinTree joinTree; private LinkedHashMap, OpParseContext> opParseCtxMap; + private boolean joinSrc; + private String[] baseSrc; + private List mapAliases; private boolean mapperCannotSpanPartns; @@ -419,14 +421,6 @@ public boolean isUseOneNullRowInputFormat() { return useOneNullRowInputFormat; } - public QBJoinTree getJoinTree() { - return joinTree; - } - - public void setJoinTree(QBJoinTree joinTree) { - this.joinTree = joinTree; - } - public void setMapperCannotSpanPartns(boolean mapperCannotSpanPartns) { this.mapperCannotSpanPartns = mapperCannotSpanPartns; } @@ -579,4 +573,28 @@ public void setDoSplitsGrouping(boolean doSplitsGrouping) { public boolean getDoSplitsGrouping() { return this.doSplitsGrouping; } + + public boolean isJoinSrc() { + return joinSrc; + } + + public void setJoinSrc(boolean joinSrc) { + this.joinSrc = joinSrc; + } + + public String[] getBaseSrc() { + return baseSrc; + } + + public void setBaseSrc(String[] baseSrc) { + this.baseSrc = baseSrc; + } + + public List getMapAliases() { + return mapAliases; + } + + public void setMapAliases(List mapAliases) { + this.mapAliases = mapAliases; + } }