Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (revision 1451466) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (working copy) @@ -88,12 +88,14 @@ // Check whether the mapjoin is a bucketed mapjoin. // The above can be ascertained by checking the big table bucket -> small table buckets // mapping in the mapjoin descriptor. + // First check if this map-join operator is already a BucketMapJoin or not. If not give up + // we are only trying to convert a BucketMapJoin to sort-BucketMapJoin. if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) { return false; } - boolean tableSorted = true; + boolean tableEligibleForBucketedSortMergeJoin = true; QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() .get(mapJoinOp); if (joinCxt == null) { @@ -111,8 +113,8 @@ List sortColumnsFirstTable = new ArrayList(); for (int pos = 0; pos < srcs.length; pos++) { - tableSorted = tableSorted - && isTableSorted(smbJoinContext, + tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin + && isEligibleForBucketSortMergeJoin(smbJoinContext, pGraphContext, mapJoinOp.getConf().getKeys().get((byte) pos), joinCxt, @@ -120,7 +122,7 @@ pos, sortColumnsFirstTable); } - if (!tableSorted) { + if (!tableEligibleForBucketedSortMergeJoin) { // this is a mapjoin but not suited for a sort merge bucket map join. check outer joins MapJoinProcessor.checkMapJoin(mapJoinOp.getConf().getPosBigTable(), mapJoinOp.getConf().getConds()); @@ -233,7 +235,7 @@ * @return * @throws SemanticException */ - private boolean isTableSorted( + private boolean isEligibleForBucketSortMergeJoin( SortBucketJoinProcCtx smbJoinContext, ParseContext pctx, List keys, @@ -392,7 +394,7 @@ SortBucketJoinProcCtx smbJoinContext, ParseContext pGraphContext) throws SemanticException { - boolean tableSorted = true; + boolean tableEligibleForBucketedSortMergeJoin = true; QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator); if (joinCtx == null) { @@ -407,8 +409,8 @@ List sortColumnsFirstTable = new ArrayList(); for (int pos = 0; pos < srcs.length; pos++) { - tableSorted = tableSorted && - isTableSorted(smbJoinContext, + tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin && + isEligibleForBucketSortMergeJoin(smbJoinContext, pGraphContext, smbJoinContext.getKeyExprMap().get((byte)pos), joinCtx, Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java (revision 1451466) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java (working copy) @@ -40,8 +40,8 @@ // The set of join operators which can be converted to a bucketed map join private Set convertedJoinOps = new HashSet(); - private Map> aliasToPartitionBucketNumberMapping; - private Map>> aliasToPartitionBucketFileNamesMapping; + private Map> tblAliasToNumberOfBucketsInEachPartition; + private Map>> tblAliasToBucketedFilePathsInEachPartition; private Map> bigTblPartsToBucketFileNames; private Map bigTblPartsToBucketNumber; private List joinAliases; @@ -72,12 +72,12 @@ this.convertedJoinOps = setOfConvertedJoins; } - public Map> getAliasToPartitionBucketNumberMapping() { - return aliasToPartitionBucketNumberMapping; + public Map> getTblAliasToNumberOfBucketsInEachPartition() { + return tblAliasToNumberOfBucketsInEachPartition; } - public Map>> getAliasToPartitionBucketFileNamesMapping() { - return aliasToPartitionBucketFileNamesMapping; + public Map>> getTblAliasToBucketedFilePathsInEachPartition() { + return tblAliasToBucketedFilePathsInEachPartition; } public Map> getBigTblPartsToBucketFileNames() { @@ -88,14 +88,14 @@ return bigTblPartsToBucketNumber; } - public void setAliasToPartitionBucketNumberMapping( - Map> aliasToPartitionBucketNumberMapping) { - this.aliasToPartitionBucketNumberMapping = aliasToPartitionBucketNumberMapping; + public void setTblAliasToNumberOfBucketsInEachPartition( + Map> tblAliasToNumberOfBucketsInEachPartition) { + this.tblAliasToNumberOfBucketsInEachPartition = tblAliasToNumberOfBucketsInEachPartition; } - public void setAliasToPartitionBucketFileNamesMapping( - Map>> aliasToPartitionBucketFileNamesMapping) { - this.aliasToPartitionBucketFileNamesMapping = aliasToPartitionBucketFileNamesMapping; + public void setTblAliasToBucketedFilePathsInEachPartition( + Map>> tblAliasToBucketedFilePathsInEachPartition) { + this.tblAliasToBucketedFilePathsInEachPartition = tblAliasToBucketedFilePathsInEachPartition; } public void setBigTblPartsToBucketFileNames( Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (revision 1451466) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (working copy) @@ -62,7 +62,7 @@ */ abstract public class AbstractBucketJoinProc implements NodeProcessor { private static final Log LOG = - LogFactory.getLog(AbstractBucketJoinProc.class.getName()); + LogFactory.getLog(AbstractBucketJoinProc.class.getName()); protected ParseContext pGraphContext; @@ -75,10 +75,10 @@ @Override abstract public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException; + Object... nodeOutputs) throws SemanticException; - private static List getOnePartitionBucketFileNames( - URI location, ParseContext pGraphContext) throws SemanticException { + private static List getBucketFilePathsOfPartition( + URI location, ParseContext pGraphContext) throws SemanticException { List fileNames = new ArrayList(); try { FileSystem fs = FileSystem.get(location, pGraphContext.getConf()); @@ -94,32 +94,35 @@ return fileNames; } + // This function checks whether all bucketing columns are also in join keys and are in same order private boolean checkBucketColumns(List bucketColumns, - List keys, - Integer[] orders) { - if (keys == null || bucketColumns == null || bucketColumns.isEmpty()) { + List joinKeys, + Integer[] joinKeyOrders) { + if (joinKeys == null || bucketColumns == null || bucketColumns.isEmpty()) { return false; } - for (int i = 0; i < keys.size(); i++) { - int index = bucketColumns.indexOf(keys.get(i)); - if (orders[i] != null && orders[i] != index) { + for (int i = 0; i < joinKeys.size(); i++) { + int index = bucketColumns.indexOf(joinKeys.get(i)); + if (joinKeyOrders[i] != null && joinKeyOrders[i] != index) { return false; } - orders[i] = index; + joinKeyOrders[i] = index; } // Check if the join columns contains all bucket columns. // If a table is bucketized on column B, but the join key is A and B, // it is easy to see joining on different buckets yield empty results. - return keys.containsAll(bucketColumns); + return joinKeys.containsAll(bucketColumns); } - private boolean checkBucketNumberAgainstBigTable( - Map> aliasToBucketNumber, int bucketNumberInPart) { - for (List bucketNums : aliasToBucketNumber.values()) { + private boolean checkNumberOfBucketsAgainstBigTable( + Map> tblAliasToNumberOfBucketsInEachPartition, + int numberOfBucketsInPartitionOfBigTable) { + for (List bucketNums : tblAliasToNumberOfBucketsInEachPartition.values()) { for (int nxt : bucketNums) { - boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0 - : bucketNumberInPart % nxt == 0; + boolean ok = (nxt >= numberOfBucketsInPartitionOfBigTable) ? nxt + % numberOfBucketsInPartitionOfBigTable == 0 + : numberOfBucketsInPartitionOfBigTable % nxt == 0; if (!ok) { return false; } @@ -129,9 +132,9 @@ } protected boolean canConvertMapJoinToBucketMapJoin( - MapJoinOperator mapJoinOp, - ParseContext pGraphContext, - BucketJoinProcCtx context) throws SemanticException { + MapJoinOperator mapJoinOp, + ParseContext pGraphContext, + BucketJoinProcCtx context) throws SemanticException { QBJoinTree joinCtx = this.pGraphContext.getMapJoinContext().get(mapJoinOp); if (joinCtx == null) { @@ -171,12 +174,12 @@ Map> keysMap = mapJoinOp.getConf().getKeys(); return checkConvertBucketMapJoin( - pGraphContext, - context, - joinCtx, - keysMap, - baseBigAlias, - joinAliases); + pGraphContext, + context, + joinCtx, + keysMap, + baseBigAlias, + joinAliases); } /* @@ -188,17 +191,17 @@ * d. The number of buckets in the big table can be divided by no of buckets in small tables. */ protected boolean checkConvertBucketMapJoin( - ParseContext pGraphContext, - BucketJoinProcCtx context, - QBJoinTree joinCtx, - Map> keysMap, - String baseBigAlias, - List joinAliases) throws SemanticException { + ParseContext pGraphContext, + BucketJoinProcCtx context, + QBJoinTree joinCtx, + Map> keysMap, + String baseBigAlias, + List joinAliases) throws SemanticException { - LinkedHashMap> aliasToPartitionBucketNumberMapping = - new LinkedHashMap>(); - LinkedHashMap>> aliasToPartitionBucketFileNamesMapping = - new LinkedHashMap>>(); + LinkedHashMap> tblAliasToNumberOfBucketsInEachPartition = + new LinkedHashMap>(); + LinkedHashMap>> tblAliasToBucketedFilePathsInEachPartition = + new LinkedHashMap>>(); HashMap> topOps = pGraphContext.getTopOps(); Map topToTable = pGraphContext.getTopToTable(); @@ -206,15 +209,16 @@ // (partition to bucket file names) and (partition to bucket number) for // the big table; LinkedHashMap> bigTblPartsToBucketFileNames = - new LinkedHashMap>(); + new LinkedHashMap>(); LinkedHashMap bigTblPartsToBucketNumber = - new LinkedHashMap(); + new LinkedHashMap(); - Integer[] orders = null; // accessing order of join cols to bucket cols, should be same + Integer[] joinKeyOrder = null; // accessing order of join cols to bucket cols, should be same boolean bigTablePartitioned = true; for (int index = 0; index < joinAliases.size(); index++) { String alias = joinAliases.get(index); Operator topOp = joinCtx.getAliasToOpInfo().get(alias); + // The alias may not be present in case of a sub-query if (topOp == null) { return false; } @@ -225,6 +229,8 @@ int oldKeySize = keys.size(); TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, keys); if (tso == null) { + // We cannot get to root TableScan operator, likely because there is a join or group-by + // between topOp and root TableScan operator. We don't handle that case, and simply return return false; } @@ -256,8 +262,8 @@ return false; } - if (orders == null) { - orders = new Integer[keys.size()]; + if (joinKeyOrder == null) { + joinKeyOrder = new Integer[keys.size()]; } Table tbl = topToTable.get(tso); @@ -267,9 +273,9 @@ prunedParts = pGraphContext.getOpToPartList().get(tso); if (prunedParts == null) { prunedParts = - PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), - pGraphContext.getConf(), alias, - pGraphContext.getPrunedPartitions()); + PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), + pGraphContext.getConf(), alias, + pGraphContext.getPrunedPartitions()); pGraphContext.getOpToPartList().put(tso, prunedParts); } } catch (HiveException e) { @@ -282,27 +288,27 @@ // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) if (partitions.isEmpty()) { if (!alias.equals(baseBigAlias)) { - aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList()); - aliasToPartitionBucketFileNamesMapping.put(alias, new ArrayList>()); + tblAliasToNumberOfBucketsInEachPartition.put(alias, Arrays. asList()); + tblAliasToBucketedFilePathsInEachPartition.put(alias, new ArrayList>()); } } else { List buckets = new ArrayList(); List> files = new ArrayList>(); for (Partition p : partitions) { - if (!checkBucketColumns(p.getBucketCols(), keys, orders)) { + if (!checkBucketColumns(p.getBucketCols(), keys, joinKeyOrder)) { return false; } List fileNames = - getOnePartitionBucketFileNames(p.getDataLocation(), pGraphContext); + getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext); // The number of files for the table should be same as number of buckets. int bucketCount = p.getBucketCount(); if (fileNames.size() != bucketCount) { String msg = "The number of buckets for table " + - tbl.getTableName() + " partition " + p.getName() + " is " + - p.getBucketCount() + ", whereas the number of files is " + fileNames.size(); + tbl.getTableName() + " partition " + p.getName() + " is " + + p.getBucketCount() + ", whereas the number of files is " + fileNames.size(); throw new SemanticException( - ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg)); + ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg)); } if (alias.equals(baseBigAlias)) { @@ -314,25 +320,25 @@ } } if (!alias.equals(baseBigAlias)) { - aliasToPartitionBucketNumberMapping.put(alias, buckets); - aliasToPartitionBucketFileNamesMapping.put(alias, files); + tblAliasToNumberOfBucketsInEachPartition.put(alias, buckets); + tblAliasToBucketedFilePathsInEachPartition.put(alias, files); } } } else { - if (!checkBucketColumns(tbl.getBucketCols(), keys, orders)) { + if (!checkBucketColumns(tbl.getBucketCols(), keys, joinKeyOrder)) { return false; } List fileNames = - getOnePartitionBucketFileNames(tbl.getDataLocation(), pGraphContext); + getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); Integer num = new Integer(tbl.getNumBuckets()); // The number of files for the table should be same as number of buckets. if (fileNames.size() != num) { String msg = "The number of buckets for table " + - tbl.getTableName() + " is " + tbl.getNumBuckets() + - ", whereas the number of files is " + fileNames.size(); + tbl.getTableName() + " is " + tbl.getNumBuckets() + + ", whereas the number of files is " + fileNames.size(); throw new SemanticException( - ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg)); + ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg)); } if (alias.equals(baseBigAlias)) { @@ -340,8 +346,8 @@ bigTblPartsToBucketNumber.put(null, tbl.getNumBuckets()); bigTablePartitioned = false; } else { - aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList(num)); - aliasToPartitionBucketFileNamesMapping.put(alias, Arrays.asList(fileNames)); + tblAliasToNumberOfBucketsInEachPartition.put(alias, Arrays.asList(num)); + tblAliasToBucketedFilePathsInEachPartition.put(alias, Arrays.asList(fileNames)); } } } @@ -349,14 +355,16 @@ // All tables or partitions are bucketed, and their bucket number is // stored in 'bucketNumbers', we need to check if the number of buckets in // the big table can be divided by no of buckets in small tables. - for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) { - if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) { + for (Integer numBucketsInPartitionOfBigTable : bigTblPartsToBucketNumber.values()) { + if (!checkNumberOfBucketsAgainstBigTable( + tblAliasToNumberOfBucketsInEachPartition, numBucketsInPartitionOfBigTable)) { return false; } } - context.setAliasToPartitionBucketNumberMapping(aliasToPartitionBucketNumberMapping); - context.setAliasToPartitionBucketFileNamesMapping(aliasToPartitionBucketFileNamesMapping); + context.setTblAliasToNumberOfBucketsInEachPartition(tblAliasToNumberOfBucketsInEachPartition); + context.setTblAliasToBucketedFilePathsInEachPartition( + tblAliasToBucketedFilePathsInEachPartition); context.setBigTblPartsToBucketFileNames(bigTblPartsToBucketFileNames); context.setBigTblPartsToBucketNumber(bigTblPartsToBucketNumber); context.setJoinAliases(joinAliases); @@ -372,24 +380,24 @@ * enhanced to keep the big table bucket -> small table buckets mapping. */ protected void convertMapJoinToBucketMapJoin( - MapJoinOperator mapJoinOp, - BucketJoinProcCtx context) throws SemanticException { + MapJoinOperator mapJoinOp, + BucketJoinProcCtx context) throws SemanticException { MapJoinDesc desc = mapJoinOp.getConf(); Map>> aliasBucketFileNameMapping = - new LinkedHashMap>>(); + new LinkedHashMap>>(); - Map> aliasToPartitionBucketNumberMapping = - context.getAliasToPartitionBucketNumberMapping(); + Map> tblAliasToNumberOfBucketsInEachPartition = + context.getTblAliasToNumberOfBucketsInEachPartition(); - Map>> aliasToPartitionBucketFileNamesMapping = - context.getAliasToPartitionBucketFileNamesMapping(); + Map>> tblAliasToBucketedFilePathsInEachPartition = + context.getTblAliasToBucketedFilePathsInEachPartition(); Map> bigTblPartsToBucketFileNames = - context.getBigTblPartsToBucketFileNames(); + context.getBigTblPartsToBucketFileNames(); Map bigTblPartsToBucketNumber = - context.getBigTblPartsToBucketNumber(); + context.getBigTblPartsToBucketNumber(); List joinAliases = context.getJoinAliases(); String baseBigAlias = context.getBaseBigAlias(); @@ -406,28 +414,33 @@ if (alias.equals(baseBigAlias)) { continue; } - for (List names : aliasToPartitionBucketFileNamesMapping.get(alias)) { + for (List names : tblAliasToBucketedFilePathsInEachPartition.get(alias)) { Collections.sort(names); } - List smallTblBucketNums = aliasToPartitionBucketNumberMapping.get(alias); - List> smallTblFilesList = aliasToPartitionBucketFileNamesMapping.get(alias); + List smallTblBucketNums = tblAliasToNumberOfBucketsInEachPartition.get(alias); + List> smallTblFilesList = tblAliasToBucketedFilePathsInEachPartition.get(alias); - Map> mapping = new LinkedHashMap>(); - aliasBucketFileNameMapping.put(alias, mapping); + Map> mappingBigTableBucketFileNameToSmallTableBucketFileNames = + new LinkedHashMap>(); + aliasBucketFileNameMapping.put(alias, + mappingBigTableBucketFileNameToSmallTableBucketFileNames); // for each bucket file in big table, get the corresponding bucket file // name in the small table. // more than 1 partition in the big table, do the mapping for each partition Iterator>> bigTblPartToBucketNames = - bigTblPartsToBucketFileNames.entrySet().iterator(); + bigTblPartsToBucketFileNames.entrySet().iterator(); Iterator> bigTblPartToBucketNum = bigTblPartsToBucketNumber - .entrySet().iterator(); + .entrySet().iterator(); while (bigTblPartToBucketNames.hasNext()) { assert bigTblPartToBucketNum.hasNext(); int bigTblBucketNum = bigTblPartToBucketNum.next().getValue(); List bigTblBucketNameList = bigTblPartToBucketNames.next().getValue(); - fillMapping(smallTblBucketNums, smallTblFilesList, - mapping, bigTblBucketNum, bigTblBucketNameList, desc.getBigTableBucketNumMapping()); + fillMappingBigTableBucketFileNameToSmallTableBucketFileNames(smallTblBucketNums, + smallTblFilesList, + mappingBigTableBucketFileNameToSmallTableBucketFileNames, bigTblBucketNum, + bigTblBucketNameList, + desc.getBigTableBucketNumMapping()); } } desc.setAliasBucketFileNameMapping(aliasBucketFileNameMapping); @@ -462,16 +475,16 @@ } // called for each partition of big table and populates mapping for each file in the partition - private static void fillMapping( - List smallTblBucketNums, - List> smallTblFilesList, - Map> mapping, - int bigTblBucketNum, List bigTblBucketNameList, - Map bucketFileNameMapping) { + private static void fillMappingBigTableBucketFileNameToSmallTableBucketFileNames( + List smallTblBucketNums, + List> smallTblFilesList, + Map> bigTableBucketFileNameToSmallTableBucketFileNames, + int bigTblBucketNum, List bigTblBucketNameList, + Map bucketFileNameMapping) { for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) { ArrayList resultFileNames = new ArrayList(); - for (int sindex = 0 ; sindex < smallTblBucketNums.size(); sindex++) { + for (int sindex = 0; sindex < smallTblBucketNums.size(); sindex++) { int smallTblBucketNum = smallTblBucketNums.get(sindex); List smallTblFileNames = smallTblFilesList.get(sindex); if (bigTblBucketNum >= smallTblBucketNum) { @@ -489,7 +502,7 @@ } } String inputBigTBLBucket = bigTblBucketNameList.get(bindex); - mapping.put(inputBigTBLBucket, resultFileNames); + bigTableBucketFileNameToSmallTableBucketFileNames.put(inputBigTBLBucket, resultFileNames); bucketFileNameMapping.put(inputBigTBLBucket, bindex); } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (revision 1451466) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (working copy) @@ -47,6 +47,7 @@ private transient String bigTableAlias; + // table alias (small) --> input file name (big) --> target file names (small) private Map>> aliasBucketFileNameMapping; private Map bigTableBucketNumMapping; private Map> bigTablePartSpecToFileMapping;