diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ed03bb9..0158600 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1042,7 +1042,6 @@ protected synchronized Kryo initialValue() { removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); removeField(kryo, MapWork.class, "opParseCtxMap"); - removeField(kryo, MapWork.class, "joinTree"); return kryo; }; }; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java index 70c23a6..fab8a93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -131,23 +130,21 @@ private boolean checkNumberOfBucketsAgainstBigTable( protected boolean canConvertMapJoinToBucketMapJoin( MapJoinOperator mapJoinOp, - ParseContext pGraphContext, BucketJoinProcCtx context) throws SemanticException { - QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp); - if (joinCtx == null) { + if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { return false; } List joinAliases = new ArrayList(); - String[] srcs = joinCtx.getBaseSrc(); - String[] left = joinCtx.getLeftAliases(); - List mapAlias = joinCtx.getMapAliases(); + String[] srcs = mapJoinOp.getConf().getBaseSrc(); + String[] left = mapJoinOp.getConf().getLeftAliases(); + List mapAlias = mapJoinOp.getConf().getMapAliases(); String baseBigAlias = null; for (String s : left) { if (s != null) { - String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s); + String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s); if (!joinAliases.contains(subQueryAlias)) { joinAliases.add(subQueryAlias); if (!mapAlias.contains(s)) { @@ -159,7 +156,7 @@ protected boolean canConvertMapJoinToBucketMapJoin( for (String s : srcs) { if (s != null) { - String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s); + String subQueryAlias = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), s); if (!joinAliases.contains(subQueryAlias)) { joinAliases.add(subQueryAlias); if (!mapAlias.contains(s)) { @@ -172,9 +169,8 @@ protected boolean canConvertMapJoinToBucketMapJoin( Map> keysMap = mapJoinOp.getConf().getKeys(); return checkConvertBucketMapJoin( - pGraphContext, context, - joinCtx, + mapJoinOp.getConf().getAliasToOpInfo(), keysMap, baseBigAlias, joinAliases); @@ -189,9 +185,8 @@ protected boolean canConvertMapJoinToBucketMapJoin( * d. The number of buckets in the big table can be divided by no of buckets in small tables. */ protected boolean checkConvertBucketMapJoin( - ParseContext pGraphContext, BucketJoinProcCtx context, - QBJoinTree joinCtx, + Map> aliasToOpInfo, Map> keysMap, String baseBigAlias, List joinAliases) throws SemanticException { @@ -215,7 +210,7 @@ protected boolean checkConvertBucketMapJoin( boolean bigTablePartitioned = true; for (int index = 0; index < joinAliases.size(); index++) { String alias = joinAliases.get(index); - Operator topOp = joinCtx.getAliasToOpInfo().get(alias); + Operator topOp = aliasToOpInfo.get(alias); // The alias may not be present in case of a sub-query if (topOp == null) { return false; @@ -438,7 +433,7 @@ protected void convertMapJoinToBucketMapJoin( } // convert partition to partition spec string - private static Map> convert(Map> mapping) { + private Map> convert(Map> mapping) { Map> converted = new HashMap>(); for (Map.Entry> entry : mapping.entrySet()) { converted.put(entry.getKey().getName(), entry.getValue()); @@ -467,7 +462,7 @@ protected void convertMapJoinToBucketMapJoin( } // called for each partition of big table and populates mapping for each file in the partition - private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( + private void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( List smallTblBucketNums, List> smallTblFilesList, Map> bigTableBucketFileNameToSmallTableBucketFileNames, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index c9e8086..ca93aba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -97,17 +96,17 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, return false; } - boolean tableEligibleForBucketedSortMergeJoin = true; - QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() - .get(mapJoinOp); - if (joinCxt == null) { + if (!this.pGraphContext.getMapJoinOps().contains(mapJoinOp)) { return false; } - String[] srcs = joinCxt.getBaseSrc(); + + String[] srcs = mapJoinOp.getConf().getBaseSrc(); for (int srcPos = 0; srcPos < srcs.length; srcPos++) { - srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]); + srcs[srcPos] = QB.getAppendedAliasFromId(mapJoinOp.getConf().getId(), srcs[srcPos]); } + boolean tableEligibleForBucketedSortMergeJoin = true; + // All the tables/partitions columns should be sorted in the same order // For example, if tables A and B are being joined on columns c1, c2 and c3 // which are the sorted and bucketed columns. The join would work, as long @@ -117,9 +116,8 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, for (int pos = 0; pos < srcs.length; pos++) { tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin && isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, mapJoinOp.getConf().getKeys().get((byte) pos), - joinCxt, + mapJoinOp.getConf().getAliasToOpInfo(), srcs, pos, sortColumnsFirstTable); @@ -141,8 +139,7 @@ protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, // Convert the bucket map-join operator to a sort-merge map join operator protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) { + SortBucketJoinProcCtx smbJoinContext) { String[] srcs = smbJoinContext.getSrcs(); SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp); @@ -219,10 +216,13 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo child.getParentOperators().remove(index); child.getParentOperators().add(index, smbJop); } - parseContext.getSmbMapJoinContext().put(smbJop, - parseContext.getMapJoinContext().get(mapJoinOp)); - parseContext.getMapJoinContext().remove(mapJoinOp); - parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp)); + + // Data structures coming from QBJoinTree + smbJop.getConf().setQBJoinTreeProps(mapJoinOp.getConf()); + // + pGraphContext.getSmbMapJoinOps().add(smbJop); + pGraphContext.getMapJoinOps().remove(mapJoinOp); + pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp)); return smbJop; } @@ -242,9 +242,8 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo */ private boolean isEligibleForBucketSortMergeJoin( SortBucketJoinProcCtx smbJoinContext, - ParseContext pctx, List keys, - QBJoinTree joinTree, + Map> aliasToOpInfo, String[] aliases, int pos, List sortColumnsFirstTable) throws SemanticException { @@ -266,7 +265,7 @@ private boolean isEligibleForBucketSortMergeJoin( * table. If the object being map-joined is a base table, then aliasToOpInfo * contains the TableScanOperator, and TableAccessAnalyzer is a no-op. */ - Operator topOp = joinTree.getAliasToOpInfo().get(alias); + Operator topOp = aliasToOpInfo.get(alias); if (topOp == null) { return false; } @@ -386,15 +385,13 @@ private boolean checkSortColsAndJoinCols(List sortCols, // It is already verified that the join can be converted to a bucket map join protected boolean checkConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { - - QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { - if (joinCtx == null) { + if (!this.pGraphContext.getJoinOps().contains(joinOperator)) { return false; } - String[] srcs = joinCtx.getBaseSrc(); + + String[] srcs = joinOperator.getConf().getBaseSrc(); // All the tables/partitions columns should be sorted in the same order // For example, if tables A and B are being joined on columns c1, c2 and c3 @@ -404,9 +401,8 @@ protected boolean checkConvertJoinToSMBJoin( for (int pos = 0; pos < srcs.length; pos++) { if (!isEligibleForBucketSortMergeJoin(smbJoinContext, - pGraphContext, smbJoinContext.getKeyExprMap().get((byte) pos), - joinCtx, + joinOperator.getConf().getAliasToOpInfo(), srcs, pos, sortColumnsFirstTable)) { @@ -421,12 +417,10 @@ protected boolean checkConvertJoinToSMBJoin( // Can the join operator be converted to a sort-merge join operator ? protected boolean canConvertJoinToSMBJoin( JoinOperator joinOperator, - SortBucketJoinProcCtx smbJoinContext, - ParseContext pGraphContext) throws SemanticException { + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { boolean canConvert = canConvertJoinToBucketMapJoin( joinOperator, - pGraphContext, smbJoinContext ); @@ -434,13 +428,12 @@ protected boolean canConvertJoinToSMBJoin( return false; } - return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext); + return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext); } // Can the join operator be converted to a bucket map-merge join operator ? protected boolean canConvertJoinToBucketMapJoin( JoinOperator joinOp, - ParseContext pGraphContext, SortBucketJoinProcCtx context) throws SemanticException { // This has already been inspected and rejected @@ -448,8 +441,7 @@ protected boolean canConvertJoinToBucketMapJoin( return false; } - QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOp); - if (joinCtx == null) { + if (!this.pGraphContext.getJoinOps().contains(joinOp)) { return false; } @@ -482,8 +474,9 @@ protected boolean canConvertJoinToBucketMapJoin( context.setBigTablePosition(bigTablePosition); String joinAlias = bigTablePosition == 0 ? - joinCtx.getLeftAlias() : joinCtx.getRightAliases()[bigTablePosition - 1]; - joinAlias = QB.getAppendedAliasFromId(joinCtx.getId(), joinAlias); + joinOp.getConf().getLeftAlias() : + joinOp.getConf().getRightAliases()[bigTablePosition - 1]; + joinAlias = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinAlias); Map> keyExprMap = new HashMap>(); List> parentOps = joinOp.getParentOperators(); @@ -497,10 +490,10 @@ protected boolean canConvertJoinToBucketMapJoin( context.setKeyExprMap(keyExprMap); // Make a deep copy of the aliases so that they are not changed in the context - String[] joinSrcs = joinCtx.getBaseSrc(); + String[] joinSrcs = joinOp.getConf().getBaseSrc(); String[] srcs = new String[joinSrcs.length]; for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) { - joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]); + joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinOp.getConf().getId(), joinSrcs[srcPos]); srcs[srcPos] = new String(joinSrcs[srcPos]); } @@ -508,9 +501,8 @@ protected boolean canConvertJoinToBucketMapJoin( // The candidate map-join was derived from the pluggable sort merge join big // table matcher. return checkConvertBucketMapJoin( - pGraphContext, context, - joinCtx, + joinOp.getConf().getAliasToOpInfo(), keyExprMap, joinAlias, Arrays.asList(srcs)); @@ -519,19 +511,23 @@ protected boolean canConvertJoinToBucketMapJoin( // Convert the join operator to a bucket map-join join operator protected MapJoinOperator convertJoinToBucketMapJoin( JoinOperator joinOp, - SortBucketJoinProcCtx joinContext, - ParseContext parseContext) throws SemanticException { + SortBucketJoinProcCtx joinContext) throws SemanticException { MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin( - parseContext.getConf(), - parseContext.getOpParseCtx(), + pGraphContext.getConf(), + pGraphContext.getOpParseCtx(), joinOp, - pGraphContext.getJoinContext().get(joinOp), + joinOp.getConf().isJoinSrc(), + joinOp.getConf().getBaseSrc(), + joinOp.getConf().getMapAliases(), joinContext.getBigTablePosition(), false, false); // Remove the join operator from the query join context - parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp)); - parseContext.getJoinContext().remove(joinOp); + // Data structures coming from QBJoinTree + mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf()); + // + pGraphContext.getMapJoinOps().add(mapJoinOp); + pGraphContext.getJoinOps().remove(joinOp); convertMapJoinToBucketMapJoin(mapJoinOp, joinContext); return mapJoinOp; } @@ -539,11 +535,10 @@ protected MapJoinOperator convertJoinToBucketMapJoin( // Convert the join operator to a sort-merge join operator protected void convertJoinToSMBJoin( JoinOperator joinOp, - SortBucketJoinProcCtx smbJoinContext, - ParseContext parseContext) throws SemanticException { - MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext); + SortBucketJoinProcCtx smbJoinContext) throws SemanticException { + MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext); SMBMapJoinOperator smbMapJoinOp = - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); smbMapJoinOp.setConvertedAutomaticallySMBJoin(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java index 1260c83..7e3c134 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java @@ -42,7 +42,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // can the mapjoin present be converted to a bucketed mapjoin boolean convert = canConvertMapJoinToBucketMapJoin( - mapJoinOperator, pGraphContext, context); + mapJoinOperator, context); HiveConf conf = context.getConf(); // Throw an error if the user asked for bucketed mapjoin to be enforced and diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 7ab35ee..09fba7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -47,8 +47,8 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; @@ -231,13 +231,16 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont ParseContext parseContext = context.parseContext; MapJoinDesc mapJoinDesc = null; if (adjustParentsChildren) { - mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), - joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true); + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), + joinOp, joinOp.getConf().isJoinSrc(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), + mapJoinConversionPos, true); } else { JoinDesc joinDesc = joinOp.getConf(); // retain the original join desc in the map join. mapJoinDesc = - new MapJoinDesc(MapJoinProcessor.getKeys(parseContext.getJoinContext().get(joinOp), joinOp).getSecond(), + new MapJoinDesc( + MapJoinProcessor.getKeys(joinOp.getConf().isJoinSrc(), + joinOp.getConf().getBaseSrc(), joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); @@ -606,7 +609,8 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo ParseContext parseContext = context.parseContext; MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, - parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + joinOp.getConf().isJoinSrc(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), + bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 9a74e1e..08bc135 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -18,7 +18,20 @@ package org.apache.hadoop.hive.ql.optimizer; -import com.google.common.collect.Interner; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -66,7 +79,6 @@ import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.QBParseInfo; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -100,19 +112,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; +import com.google.common.collect.Interner; /** * General utility common functions for the Processor to convert operator into @@ -1058,17 +1058,23 @@ private static void splitTasks(ReduceSinkOperator op, if (needsTagging(cplan.getReduceWork())) { Operator reducerOp = cplan.getReduceWork().getReducer(); - QBJoinTree joinTree = null; + String id = null; if (reducerOp instanceof JoinOperator) { - joinTree = parseCtx.getJoinContext().get(reducerOp); + if (parseCtx.getJoinOps().contains(reducerOp)) { + id = ((JoinOperator)reducerOp).getConf().getId(); + } } else if (reducerOp instanceof MapJoinOperator) { - joinTree = parseCtx.getMapJoinContext().get(reducerOp); + if (parseCtx.getMapJoinOps().contains(reducerOp)) { + id = ((MapJoinOperator)reducerOp).getConf().getId(); + } } else if (reducerOp instanceof SMBMapJoinOperator) { - joinTree = parseCtx.getSmbMapJoinContext().get(reducerOp); + if (parseCtx.getSmbMapJoinOps().contains(reducerOp)) { + id = ((SMBMapJoinOperator)reducerOp).getConf().getId(); + } } - if (joinTree != null && joinTree.getId() != null) { - streamDesc = joinTree.getId() + ":$INTNAME"; + if (id != null) { + streamDesc = id + ":$INTNAME"; } else { streamDesc = "$INTNAME"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 9238e0e..065edef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -95,9 +94,9 @@ private int getOutputSize(Operator operator, private Set getBigTables(ParseContext joinCtx) { Set bigTables = new HashSet(); - for (QBJoinTree qbJoin : joinCtx.getJoinContext().values()) { - if (qbJoin.getStreamAliases() != null) { - bigTables.addAll(qbJoin.getStreamAliases()); + for (JoinOperator joinOp : joinCtx.getJoinOps()) { + if (joinOp.getConf().getStreamAliases() != null) { + bigTables.addAll(joinOp.getConf().getStreamAliases()); } } @@ -155,7 +154,7 @@ private void reorder(JoinOperator joinOp, Set bigTables) { public ParseContext transform(ParseContext pactx) throws SemanticException { Set bigTables = getBigTables(pactx); - for (JoinOperator joinOp : pactx.getJoinContext().keySet()) { + for (JoinOperator joinOp : pactx.getJoinOps()) { reorder(joinOp, bigTables); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index ccb3ce5..f1680f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -95,19 +94,14 @@ // (column type + column name). The column name is not really used anywhere, but it // needs to be passed. Use the string defined below for that. private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey"; - - private ParseContext pGraphContext; - - /** - * empty constructor. - */ + public MapJoinProcessor() { - pGraphContext = null; } @SuppressWarnings("nls") - private Operator - putOpInsertMap(Operator op, RowResolver rr) { + private static Operator putOpInsertMap ( + ParseContext pGraphContext, Operator op, + RowResolver rr) { OpParseContext ctx = new OpParseContext(rr); pGraphContext.getOpParseCtx().put(op, ctx); return op; @@ -232,10 +226,10 @@ public static void genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, throws SemanticException { LinkedHashMap, OpParseContext> opParseCtxMap = newWork.getMapWork().getOpParseCtxMap(); - QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree(); // generate the map join operator; already checked the map join MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op, - newJoinTree, mapJoinPos, true, false); + newWork.getMapWork().isJoinSrc(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(), + mapJoinPos, true, false); genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); } @@ -247,7 +241,9 @@ public static void genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator ne MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos); // clean up the mapred work newWork.getMapWork().setOpParseCtxMap(null); - newWork.getMapWork().setJoinTree(null); + newWork.getMapWork().setJoinSrc(false); + newWork.getMapWork().setBaseSrc(null); + newWork.getMapWork().setMapAliases(null); } catch (Exception e) { e.printStackTrace(); @@ -306,9 +302,8 @@ private static void validateMapJoinTypes(Operator op) */ public static MapJoinOperator convertMapJoin(HiveConf conf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin, - boolean validateMapJoinTree) - throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException { // outer join cannot be performed on a table which is being cached JoinDesc desc = op.getConf(); @@ -323,8 +318,6 @@ public static MapJoinOperator convertMapJoin(HiveConf conf, // Walk over all the sources (which are guaranteed to be reduce sink // operators). // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); - List> parentOps = op.getParentOperators(); List> newParentOps = new ArrayList>(); @@ -332,7 +325,7 @@ public static MapJoinOperator convertMapJoin(HiveConf conf, new ArrayList>(); // found a source which is not to be stored in memory - if (leftSrc != null) { + if (leftSrc) { // assert mapJoinPos == 0; Operator parentOp = parentOps.get(0); assert parentOp.getParentOperators().size() == 1; @@ -344,7 +337,7 @@ public static MapJoinOperator convertMapJoin(HiveConf conf, byte pos = 0; // Remove parent reduce-sink operators - for (String src : joinTree.getBaseSrc()) { + for (String src : baseSrc) { if (src != null) { Operator parentOp = parentOps.get(pos); assert parentOp.getParentOperators().size() == 1; @@ -359,7 +352,7 @@ public static MapJoinOperator convertMapJoin(HiveConf conf, // create the map-join operator MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, - op, joinTree, mapJoinPos, noCheckOuterJoin); + op, leftSrc, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin); // remove old parents @@ -383,11 +376,12 @@ public static MapJoinOperator convertMapJoin(HiveConf conf, static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) - throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { MapJoinDesc mapJoinDescriptor = - getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin); + getMapJoinDesc(hconf, opParseCtxMap, op, leftSrc, baseSrc, mapAliases, + mapJoinPos, noCheckOuterJoin); // reduce sink row resolver used to generate map join op RowResolver outputRS = opParseCtxMap.get(op).getRowResolver(); @@ -439,7 +433,7 @@ private static boolean needValueIndex(int[] valueIndex) { */ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, Map, OpParseContext> opParseCtxMap, - SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin) + SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin) throws SemanticException { // Create a new map join operator SMBJoinDesc smbJoinDesc = smbJoinOp.getConf(); @@ -486,7 +480,7 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, } public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op, - QBJoinTree joinTree, int mapJoinPos) throws SemanticException { + int mapJoinPos) throws SemanticException { HiveConf hiveConf = pctx.getConf(); boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) @@ -495,7 +489,8 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o LinkedHashMap, OpParseContext> opParseCtxMap = pctx .getOpParseCtx(); MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op, - joinTree, mapJoinPos, noCheckOuterJoin, true); + op.getConf().isJoinSrc(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(), + mapJoinPos, noCheckOuterJoin, true); // create a dummy select to select all columns genSelectPlan(pctx, mapJoinOp); return mapJoinOp; @@ -624,7 +619,7 @@ private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Sema SelectDesc select = new SelectDesc(exprs, outputs, false); - SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select, + SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR.getColumnInfos()), input), inputRR); sel.setColumnExprMap(colExprMap); @@ -641,24 +636,22 @@ private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Sema * * @param op * join operator - * @param joinTree - * qb join tree * @return -1 if it cannot be converted to a map-side join, position of the map join node * otherwise */ - private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException { + private int mapSideJoin(JoinOperator op) throws SemanticException { int mapJoinPos = -1; - if (joinTree.isMapSideJoin()) { + if (op.getConf().isMapSideJoin()) { int pos = 0; // In a map-side join, exactly one table is not present in memory. // The client provides the list of tables which can be cached in memory // via a hint. - if (joinTree.getJoinSrc() != null) { + if (op.getConf().isJoinSrc()) { mapJoinPos = pos; } - for (String src : joinTree.getBaseSrc()) { + for (String src : op.getConf().getBaseSrc()) { if (src != null) { - if (!joinTree.getMapAliases().contains(src)) { + if (!op.getConf().getMapAliases().contains(src)) { if (mapJoinPos >= 0) { return -1; } @@ -673,7 +666,7 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc // leaving some table from the list of tables to be cached if (mapJoinPos == -1) { throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg( - Arrays.toString(joinTree.getBaseSrc()))); + Arrays.toString(op.getConf().getBaseSrc()))); } } @@ -689,36 +682,34 @@ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticExc */ @Override public ParseContext transform(ParseContext pactx) throws SemanticException { - pGraphContext = pactx; List listMapJoinOps = new ArrayList(); // traverse all the joins and convert them if necessary - if (pGraphContext.getJoinContext() != null) { - Map joinMap = new HashMap(); - Map mapJoinMap = pGraphContext.getMapJoinContext(); + if (pactx.getJoinOps() != null) { + Set joinMap = new HashSet(); + Set mapJoinMap = pactx.getMapJoinOps(); if (mapJoinMap == null) { - mapJoinMap = new HashMap(); - pGraphContext.setMapJoinContext(mapJoinMap); + mapJoinMap = new HashSet(); + pactx.setMapJoinOps(mapJoinMap); } - Set> joinCtx = pGraphContext.getJoinContext().entrySet(); - Iterator> joinCtxIter = joinCtx.iterator(); + Iterator joinCtxIter = pactx.getJoinOps().iterator(); while (joinCtxIter.hasNext()) { - Map.Entry joinEntry = joinCtxIter.next(); - JoinOperator joinOp = joinEntry.getKey(); - QBJoinTree qbJoin = joinEntry.getValue(); - int mapJoinPos = mapSideJoin(joinOp, qbJoin); + JoinOperator joinOp = joinCtxIter.next(); + int mapJoinPos = mapSideJoin(joinOp); if (mapJoinPos >= 0) { - MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos); + MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, mapJoinPos); listMapJoinOps.add(mapJoinOp); - mapJoinMap.put(mapJoinOp, qbJoin); + mapJoinOp.getConf().setQBJoinTreeProps(joinOp.getConf()); + mapJoinMap.add(mapJoinOp); } else { - joinMap.put(joinOp, qbJoin); + joinOp.getConf().setQBJoinTreeProps(joinOp.getConf()); + joinMap.add(joinOp); } } // store the new joinContext - pGraphContext.setJoinContext(joinMap); + pactx.setJoinOps(joinMap); } // Go over the list and find if a reducer is not needed @@ -744,15 +735,15 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx( - listMapJoinOpsNoRed, pGraphContext)); + listMapJoinOpsNoRed, pactx)); GraphWalker ogw = new GenMapRedWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.addAll(listMapJoinOps); ogw.startWalking(topNodes, null); - pGraphContext.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); - return pGraphContext; + pactx.setListMapJoinOpsNoReducer(listMapJoinOpsNoRed); + return pactx; } /** @@ -798,7 +789,7 @@ private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOpera } Operator ch = parent.getChildOperators().get(0); if (ch instanceof MapJoinOperator) { - if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) { + if (!nonSubqueryMapJoin((MapJoinOperator) ch, mapJoin)) { if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf() .getPosBigTable()) { // not come from the local branch @@ -818,11 +809,8 @@ private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOpera } } - private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin, - MapJoinOperator parentMapJoin) { - QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin); - QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin); - if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) { + private boolean nonSubqueryMapJoin(MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) { + if (mapJoin.getParentOperators().contains(parentMapJoin)) { return true; } return false; @@ -1026,15 +1014,17 @@ public void setpGraphContext(ParseContext pGraphContext) { } - public static ObjectPair, Map>> getKeys(QBJoinTree joinTree, JoinOperator op) { + public static ObjectPair, Map>> getKeys( + boolean leftSrc, + String[] baseSrc, + JoinOperator op) { // Walk over all the sources (which are guaranteed to be reduce sink // operators). // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); List oldReduceSinkParentOps = new ArrayList(op.getNumParent()); - if (leftSrc != null) { + if (leftSrc) { // assert mapJoinPos == 0; Operator parentOp = op.getParentOperators().get(0); assert parentOp.getParentOperators().size() == 1; @@ -1042,7 +1032,7 @@ public void setpGraphContext(ParseContext pGraphContext) { } byte pos = 0; - for (String src : joinTree.getBaseSrc()) { + for (String src : baseSrc) { if (src != null) { Operator parentOp = op.getParentOperators().get(pos); assert parentOp.getParentOperators().size() == 1; @@ -1065,7 +1055,8 @@ public void setpGraphContext(ParseContext pGraphContext) { public static MapJoinDesc getMapJoinDesc(HiveConf hconf, LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { JoinDesc desc = op.getConf(); JoinCondDesc[] condns = desc.getConds(); Byte[] tagOrder = desc.getTagOrder(); @@ -1082,7 +1073,8 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, Map> valueExprs = op.getConf().getExprs(); Map> newValueExprs = new HashMap>(); - ObjectPair, Map>> pair = getKeys(joinTree, op); + ObjectPair, Map>> pair = + getKeys(leftSrc, baseSrc, op); List oldReduceSinkParentOps = pair.getFirst(); for (Map.Entry> entry : valueExprs.entrySet()) { byte tag = entry.getKey(); @@ -1172,8 +1164,8 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, // create dumpfile prefix needed to create descriptor String dumpFilePrefix = ""; - if (joinTree.getMapAliases() != null) { - for (String mapAlias : joinTree.getMapAliases()) { + if (mapAliases != null) { + for (String mapAlias : mapAliases) { dumpFilePrefix = dumpFilePrefix + mapAlias; } dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index 5291851..035f897 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -55,15 +55,12 @@ */ public class NonBlockingOpDeDupProc implements Transform { - private ParseContext pctx; - @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - this.pctx = pctx; String SEL = SelectOperator.getOperatorName(); String FIL = FilterOperator.getOperatorName(); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup()); + opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx)); opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); @@ -76,6 +73,13 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } private class SelectDedup implements NodeProcessor { + + private ParseContext pctx; + + public SelectDedup (ParseContext pctx) { + this.pctx = pctx; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -178,28 +182,37 @@ private boolean checkReferences(ExprNodeDesc expr, Set funcOutputs, Set< } return true; } - } - - /** - * Change existing references in the context to point from child to parent operator. - * @param cSEL child operator (to be removed, and merged into parent) - * @param pSEL parent operator - */ - private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { - Collection qbJoinTrees = new ArrayList(); - qbJoinTrees.addAll(pctx.getJoinContext().values()); - qbJoinTrees.addAll(pctx.getMapJoinContext().values()); - for (QBJoinTree qbJoinTree : qbJoinTrees) { - Map> aliasToOpInfo = qbJoinTree.getAliasToOpInfo(); - for (Map.Entry> entry : aliasToOpInfo.entrySet()) { - if (entry.getValue() == cSEL) { - aliasToOpInfo.put(entry.getKey(), pSEL); + + /** + * Change existing references in the context to point from child to parent operator. + * @param cSEL child operator (to be removed, and merged into parent) + * @param pSEL parent operator + */ + private void fixContextReferences(SelectOperator cSEL, SelectOperator pSEL) { + Collection>> mapsAliasToOpInfo = + new ArrayList>>(); + for (JoinOperator joinOp : pctx.getJoinOps()) { + if (joinOp.getConf().getAliasToOpInfo() != null) { + mapsAliasToOpInfo.add(joinOp.getConf().getAliasToOpInfo()); + } + } + for (MapJoinOperator mapJoinOp : pctx.getMapJoinOps()) { + if (mapJoinOp.getConf().getAliasToOpInfo() != null) { + mapsAliasToOpInfo.add(mapJoinOp.getConf().getAliasToOpInfo()); + } + } + for (Map> aliasToOpInfo : mapsAliasToOpInfo) { + for (Map.Entry> entry : aliasToOpInfo.entrySet()) { + if (entry.getValue() == cSEL) { + aliasToOpInfo.put(entry.getKey(), pSEL); + } } } } } private class FilterDedup implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index ea06503..1609831 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -79,11 +79,13 @@ public class SkewJoinOptimizer implements Transform { private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName()); - private static ParseContext parseContext; public static class SkewJoinProc implements NodeProcessor { - public SkewJoinProc() { + private ParseContext parseContext; + + public SkewJoinProc(ParseContext parseContext) { super(); + this.parseContext = parseContext; } @Override @@ -165,23 +167,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } - // have to create a QBJoinTree for the cloned join operator - QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp); - QBJoinTree newJoinTree; - try { - newJoinTree = originJoinTree.clone(); - } catch (CloneNotSupportedException e) { - LOG.debug("QBJoinTree could not be cloned: ", e); - return null; - } - JoinOperator joinOpClone; if (processSelect) { joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0)); } else { joinOpClone = (JoinOperator)currOpClone; } - parseContext.getJoinContext().put(joinOpClone, newJoinTree); + joinOpClone.getConf().cloneQBJoinTreeProps(joinOp.getConf()); + parseContext.getJoinOps().add(joinOpClone); List tableScanCloneOpsForJoin = new ArrayList(); @@ -213,7 +206,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } parseContext.getTopOps().put(newAlias, tso); - setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso); + setUpAlias(joinOp, joinOpClone, tabAlias, newAlias, tso); } // Now do a union of the select operators: selectOp and selectOpClone @@ -629,19 +622,19 @@ private void insertRowResolvers( /** * Set alias in the cloned join tree */ - private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias, + private static void setUpAlias(JoinOperator origin, JoinOperator cloned, String origAlias, String newAlias, Operator topOp) { - cloned.getAliasToOpInfo().remove(origAlias); - cloned.getAliasToOpInfo().put(newAlias, topOp); - if (origin.getLeftAlias().equals(origAlias)) { - cloned.setLeftAlias(null); - cloned.setLeftAlias(newAlias); + cloned.getConf().getAliasToOpInfo().remove(origAlias); + cloned.getConf().getAliasToOpInfo().put(newAlias, topOp); + if (origin.getConf().getLeftAlias().equals(origAlias)) { + cloned.getConf().setLeftAlias(null); + cloned.getConf().setLeftAlias(newAlias); } - replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias); - replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias); - replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias); - replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias); - replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getLeftAliases(), cloned.getConf().getLeftAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getRightAliases(), cloned.getConf().getRightAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getBaseSrc(), cloned.getConf().getBaseSrc(), origAlias, newAlias); + replaceAlias(origin.getConf().getMapAliases(), cloned.getConf().getMapAliases(), origAlias, newAlias); + replaceAlias(origin.getConf().getStreamAliases(), cloned.getConf().getStreamAliases(), origAlias, newAlias); } private static void replaceAlias(String[] origin, String[] cloned, @@ -677,7 +670,7 @@ private static void replaceAlias(List origin, List cloned, public ParseContext transform(ParseContext pctx) throws SemanticException { Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc()); + opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc(pctx)); SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -692,8 +685,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } - private NodeProcessor getSkewJoinProc() { - return new SkewJoinProc(); + private NodeProcessor getSkewJoinProc(ParseContext parseContext) { + return new SkewJoinProc(parseContext); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java index 11ce47e..f6ca039 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java @@ -60,7 +60,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } if (convert) { - convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext); + convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java index 8a0c474..d090598 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java @@ -44,10 +44,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx; boolean convert = canConvertJoinToSMBJoin( - joinOp, smbJoinContext, pGraphContext); + joinOp, smbJoinContext); if (convert) { - convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext); + convertJoinToSMBJoin(joinOp, smbJoinContext); } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index c52f753..d4de0bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -107,7 +107,7 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { // that has both intermediate tables and query input tables as input tables, // we should be able to guess if this JoinOperator will be converted to a MapJoin // based on hive.auto.convert.join.noconditionaltask.size. - for (JoinOperator joinOp: pCtx.getJoinContext().keySet()) { + for (JoinOperator joinOp: pCtx.getJoinOps()) { boolean isAbleToGuess = true; boolean mayConvert = false; // Get total size and individual alias's size diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 33ef581..a3a7f42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MapWork; /** @@ -53,8 +52,7 @@ public AbstractJoinTaskDispatcher(PhysicalContext context) { throws SemanticException; protected void replaceTaskWithConditionalTask( - Task currTask, ConditionalTask cndTsk, - PhysicalContext physicalContext) { + Task currTask, ConditionalTask cndTsk) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); @@ -88,8 +86,7 @@ protected void replaceTaskWithConditionalTask( // Replace the task with the new task. Copy the children and parents of the old // task to the new task. protected void replaceTask( - Task currTask, Task newTask, - PhysicalContext physicalContext) { + Task currTask, Task newTask) { // add this task into task tree // set all parent tasks List> parentTasks = currTask.getParentTasks(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 9c26907..356c3bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; @@ -400,7 +399,6 @@ public static boolean cannotConvert(long aliasKnownSize, // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp); // start to generate multiple map join tasks JoinDesc joinDesc = joinOp.getConf(); @@ -458,7 +456,9 @@ public static boolean cannotConvert(long aliasKnownSize, } currWork.setOpParseCtxMap(parseCtx.getOpParseCtx()); - currWork.setJoinTree(joinTree); + currWork.setJoinSrc(joinOp.getConf().isJoinSrc()); + currWork.setBaseSrc(joinOp.getConf().getBaseSrc()); + currWork.setMapAliases(joinOp.getConf().getMapAliases()); if (bigTablePosition >= 0) { // create map join task and set big table as bigTablePosition @@ -466,7 +466,7 @@ public static boolean cannotConvert(long aliasKnownSize, newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP); newTask.setFetchSource(currTask.isFetchSource()); - replaceTask(currTask, newTask, physicalContext); + replaceTask(currTask, newTask); // Can this task be merged with the child task. This can happen if a big table is being // joined with multiple small tables on different keys @@ -522,7 +522,9 @@ public static boolean cannotConvert(long aliasKnownSize, listTasks.add(currTask); // clear JoinTree and OP Parse Context currWork.setOpParseCtxMap(null); - currWork.setJoinTree(null); + currWork.setJoinSrc(false); + currWork.setBaseSrc(null); + currWork.setMapAliases(null); // create conditional task and insert conditional task into task tree ConditionalWork cndWork = new ConditionalWork(listWorks); @@ -541,7 +543,7 @@ public static boolean cannotConvert(long aliasKnownSize, cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 6f92b13..150b004 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; @@ -168,8 +167,7 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera // create map join task and set big table as bigTablePosition private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork, int bigTablePosition, - SMBMapJoinOperator smbJoinOp, - QBJoinTree joinTree) + SMBMapJoinOperator smbJoinOp) throws UnsupportedEncodingException, SemanticException { // deep copy a new mapred work MapredWork newWork = Utilities.clonePlan(origWork); @@ -178,7 +176,7 @@ private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork, .getParseContext().getConf()); // generate the map join operator; already checked the map join MapJoinOperator newMapJoinOp = - getMapJoinOperator(newTask, newWork, smbJoinOp, joinTree, bigTablePosition); + getMapJoinOperator(newTask, newWork, smbJoinOp, bigTablePosition); // The reducer needs to be restored - Consider a query like: // select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key; @@ -246,7 +244,6 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // get parseCtx for this Join Operator ParseContext parseCtx = physicalContext.getParseContext(); - QBJoinTree joinTree = parseCtx.getSmbMapJoinContext().get(originalSMBJoinOp); // Convert the work containing to sort-merge join into a work, as if it had a regular join. // Note that the operator tree is not changed - is still contains the SMB join, but the @@ -257,9 +254,13 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork); currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx()); - currWork.getMapWork().setJoinTree(joinTree); + currWork.getMapWork().setJoinSrc(originalSMBJoinOp.getConf().isJoinSrc()); + currWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc()); + currWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases()); currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx()); - currJoinWork.getMapWork().setJoinTree(joinTree); + currJoinWork.getMapWork().setJoinSrc(originalSMBJoinOp.getConf().isJoinSrc()); + currJoinWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc()); + currJoinWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases()); // create conditional work list and task list List listWorks = new ArrayList(); @@ -296,7 +297,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // create map join task for the given big table position MapRedTask newTask = convertSMBTaskToMapJoinTask( - currJoinWork, bigTablePosition, newSMBJoinOp, joinTree); + currJoinWork, bigTablePosition, newSMBJoinOp); MapWork mapWork = newTask.getWork().getMapWork(); Operator parentOp = originalSMBJoinOp.getParentOperators().get(bigTablePosition); @@ -334,7 +335,9 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) listTasks.add(currTask); // clear JoinTree and OP Parse Context currWork.getMapWork().setOpParseCtxMap(null); - currWork.getMapWork().setJoinTree(null); + currWork.getMapWork().setJoinSrc(false); + currWork.getMapWork().setBaseSrc(null); + currWork.getMapWork().setMapAliases(null); // create conditional task and insert conditional task into task tree ConditionalWork cndWork = new ConditionalWork(listWorks); @@ -353,7 +356,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) cndTsk.setResolverCtx(resolverCtx); // replace the current task with the new generated conditional task - replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext); + replaceTaskWithConditionalTask(currTask, cndTsk); return cndTsk; } @@ -426,7 +429,6 @@ private SMBMapJoinOperator getSMBMapJoinOp(MapredWork work) throws SemanticExcep private MapJoinOperator getMapJoinOperator(MapRedTask task, MapredWork work, SMBMapJoinOperator oldSMBJoinOp, - QBJoinTree joinTree, int mapJoinPos) throws SemanticException { SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(task.getWork()); @@ -437,7 +439,6 @@ private MapJoinOperator getMapJoinOperator(MapRedTask task, // generate the map join operator return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(), - opParseContextMap, newSMBJoinOp, - joinTree, mapJoinPos, true); + opParseContextMap, newSMBJoinOp, mapJoinPos, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 8215c26..81a3256 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; @@ -75,9 +74,9 @@ private HashMap> topOps; private HashMap> topSelOps; private LinkedHashMap, OpParseContext> opParseCtx; - private Map joinContext; - private Map mapJoinContext; - private Map smbMapJoinContext; + private Set joinOps; + private Set mapJoinOps; + private Set smbMapJoinOps; private HashMap topToTable; private Map fsopToTable; private List reduceSinkOperatorsAddedByEnforceBucketingSorting; @@ -133,7 +132,7 @@ public ParseContext() { * @param opParseCtx * operator parse context - contains a mapping from operator to * operator parse state (row resolver etc.) - * @param joinContext + * @param joinOps * context needed join processing (map join specifically) * @param topToTable * the top tables being processed @@ -165,8 +164,8 @@ public ParseContext( HashMap> topOps, HashMap> topSelOps, LinkedHashMap, OpParseContext> opParseCtx, - Map joinContext, - Map smbMapJoinContext, + Set joinOps, + Set smbMapJoinOps, HashMap topToTable, HashMap> topToProps, Map fsopToTable, @@ -188,8 +187,8 @@ public ParseContext( this.ast = ast; this.opToPartPruner = opToPartPruner; this.opToPartList = opToPartList; - this.joinContext = joinContext; - this.smbMapJoinContext = smbMapJoinContext; + this.joinOps = joinOps; + this.smbMapJoinOps = smbMapJoinOps; this.topToTable = topToTable; this.fsopToTable = fsopToTable; this.topToProps = topToProps; @@ -476,18 +475,18 @@ public void setUCtx(UnionProcContext uCtx) { } /** - * @return the joinContext + * @return the joinOps */ - public Map getJoinContext() { - return joinContext; + public Set getJoinOps() { + return joinOps; } /** - * @param joinContext - * the joinContext to set + * @param joinOps + * the joinOps to set */ - public void setJoinContext(Map joinContext) { - this.joinContext = joinContext; + public void setJoinOps(Set joinOps) { + this.joinOps = joinOps; } /** @@ -570,20 +569,20 @@ public LineageInfo getLineageInfo() { return lInfo; } - public Map getMapJoinContext() { - return mapJoinContext; + public Set getMapJoinOps() { + return mapJoinOps; } - public void setMapJoinContext(Map mapJoinContext) { - this.mapJoinContext = mapJoinContext; + public void setMapJoinOps(Set mapJoinOps) { + this.mapJoinOps = mapJoinOps; } - public Map getSmbMapJoinContext() { - return smbMapJoinContext; + public Set getSmbMapJoinOps() { + return smbMapJoinOps; } - public void setSmbMapJoinContext(Map smbMapJoinContext) { - this.smbMapJoinContext = smbMapJoinContext; + public void setSmbMapJoinOps(Set smbMapJoinOps) { + this.smbMapJoinOps = smbMapJoinOps; } public GlobalLimitCtx getGlobalLimitCtx() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c2d5c8c..1db5956 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -378,8 +378,6 @@ public void initParseCtx(ParseContext pctx) { opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); - joinContext = pctx.getJoinContext(); - smbMapJoinContext = pctx.getSmbMapJoinContext(); ctx = pctx.getContext(); destTableId = pctx.getDestTableId(); idToTableNameMap = pctx.getIdToTableNameMap(); @@ -394,7 +392,10 @@ public void initParseCtx(ParseContext pctx) { public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, - topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topSelOps, opParseCtx, + new HashSet(joinContext.keySet()), + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, @@ -7363,6 +7364,7 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree, joinSrcOp, srcOps, omitOpts, joinKeys); + joinOp.getConf().setQBJoinTreeProps(joinTree); joinContext.put(joinOp, joinTree); Operator op = joinOp; @@ -9988,7 +9990,10 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce // 4. Generate Parse Context for Optimizer & Physical compiler ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, opToPartPruner, opToPartList, - topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, + topOps, topSelOps, opParseCtx, + new HashSet(joinContext.keySet()), + new HashSet(smbMapJoinContext.keySet()), + topToTable, topToTableProps, fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java index da14ab4..b198db0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -174,10 +173,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Get the key column names for each side of the join, // and check if the keys are all constants // or columns (not expressions). If yes, proceed. - QBJoinTree joinTree = pGraphContext.getJoinContext().get(op); - assert(parentOps.size() == joinTree.getBaseSrc().length); + assert(parentOps.size() == op.getConf().getBaseSrc().length); int pos = 0; - for (String src : joinTree.getBaseSrc()) { + for (String src : op.getConf().getBaseSrc()) { if (src != null) { assert(parentOps.get(pos) instanceof ReduceSinkOperator); ReduceSinkOperator reduceSinkOp = (ReduceSinkOperator) parentOps.get(pos); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 23fbbe1..814a9f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Set; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -60,6 +58,9 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; + /** * TaskCompiler is a the base class for classes that compile * operator pipelines into tasks. @@ -386,8 +387,8 @@ public ParseContext getParseContext(ParseContext pCtx, List mapAliases; //map-side join aliases + private transient Map> aliasToOpInfo; + private transient boolean joinSrc; + private transient List streamAliases; + public JoinDesc() { } public JoinDesc(final Map> exprs, List outputColumnNames, final boolean noOuterJoin, - final JoinCondDesc[] conds, final Map> filters, ExprNodeDesc[][] joinKeys) { + final JoinCondDesc[] conds, final Map> filters, + ExprNodeDesc[][] joinKeys) { this.exprs = exprs; this.outputColumnNames = outputColumnNames; this.noOuterJoin = noOuterJoin; @@ -505,8 +521,92 @@ public int getTagLength() { public boolean isFixedAsSorted() { return fixedAsSorted; } - + public void setFixedAsSorted(boolean fixedAsSorted) { this.fixedAsSorted = fixedAsSorted; } + + public String[] getLeftAliases() { + return leftAliases; + } + + public String[] getBaseSrc() { + return baseSrc; + } + + public String getId() { + return id; + } + + public List getMapAliases() { + return mapAliases; + } + + public Map> getAliasToOpInfo() { + return aliasToOpInfo; + } + + public boolean isJoinSrc() { + return joinSrc; + } + + public String getLeftAlias() { + return leftAlias; + } + + public void setLeftAlias(String leftAlias) { + this.leftAlias = leftAlias; + } + + public String[] getRightAliases() { + return rightAliases; + } + + public List getStreamAliases() { + return streamAliases; + } + + public boolean isMapSideJoin() { + return mapSideJoin; + } + + public void setQBJoinTreeProps(JoinDesc joinDesc) { + leftAlias = joinDesc.leftAlias; + leftAliases = joinDesc.leftAliases; + rightAliases = joinDesc.rightAliases; + baseSrc = joinDesc.baseSrc; + id = joinDesc.id; + mapSideJoin = joinDesc.mapSideJoin; + mapAliases = joinDesc.mapAliases; + aliasToOpInfo = joinDesc.aliasToOpInfo; + joinSrc = joinDesc.joinSrc; + streamAliases = joinDesc.streamAliases; + } + + public void setQBJoinTreeProps(QBJoinTree joinDesc) { + leftAlias = joinDesc.getLeftAlias(); + leftAliases = joinDesc.getLeftAliases(); + rightAliases = joinDesc.getRightAliases(); + baseSrc = joinDesc.getBaseSrc(); + id = joinDesc.getId(); + mapSideJoin = joinDesc.isMapSideJoin(); + mapAliases = joinDesc.getMapAliases(); + aliasToOpInfo = joinDesc.getAliasToOpInfo(); + joinSrc = joinDesc.getJoinSrc() != null; + streamAliases = joinDesc.getStreamAliases(); + } + + public void cloneQBJoinTreeProps(JoinDesc joinDesc) { + leftAlias = joinDesc.leftAlias; + leftAliases = joinDesc.leftAliases == null ? null : joinDesc.leftAliases.clone(); + rightAliases = joinDesc.rightAliases == null ? null : joinDesc.rightAliases.clone(); + baseSrc = joinDesc.baseSrc == null ? null : joinDesc.baseSrc.clone(); + id = joinDesc.id; + mapSideJoin = joinDesc.mapSideJoin; + mapAliases = joinDesc.mapAliases == null ? null : new ArrayList(joinDesc.mapAliases); + aliasToOpInfo = new HashMap>(joinDesc.aliasToOpInfo); + joinSrc = joinDesc.joinSrc; + streamAliases = joinDesc.streamAliases == null ? null : new ArrayList(joinDesc.streamAliases); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 9f8c091..a15e23f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -30,7 +30,6 @@ import java.util.Map.Entry; import java.util.Set; -import com.google.common.collect.Interner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -41,10 +40,11 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.OpParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.mapred.JobConf; +import com.google.common.collect.Interner; + /** * MapWork represents all the information used to run a map task on the cluster. * It is first used when the query planner breaks the logical plan into tasks and @@ -105,8 +105,10 @@ public static final int SAMPLING_ON_START = 2; // sampling on task running // the following two are used for join processing - private QBJoinTree joinTree; private LinkedHashMap, OpParseContext> opParseCtxMap; + private boolean joinSrc; + private String[] baseSrc; + private List mapAliases; private boolean mapperCannotSpanPartns; @@ -419,14 +421,6 @@ public boolean isUseOneNullRowInputFormat() { return useOneNullRowInputFormat; } - public QBJoinTree getJoinTree() { - return joinTree; - } - - public void setJoinTree(QBJoinTree joinTree) { - this.joinTree = joinTree; - } - public void setMapperCannotSpanPartns(boolean mapperCannotSpanPartns) { this.mapperCannotSpanPartns = mapperCannotSpanPartns; } @@ -579,4 +573,28 @@ public void setDoSplitsGrouping(boolean doSplitsGrouping) { public boolean getDoSplitsGrouping() { return this.doSplitsGrouping; } + + public boolean isJoinSrc() { + return joinSrc; + } + + public void setJoinSrc(boolean joinSrc) { + this.joinSrc = joinSrc; + } + + public String[] getBaseSrc() { + return baseSrc; + } + + public void setBaseSrc(String[] baseSrc) { + this.baseSrc = baseSrc; + } + + public List getMapAliases() { + return mapAliases; + } + + public void setMapAliases(List mapAliases) { + this.mapAliases = mapAliases; + } }