Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (revision 1447814) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (working copy) @@ -88,12 +88,14 @@ // Check whether the mapjoin is a bucketed mapjoin. // The above can be ascertained by checking the big table bucket -> small table buckets // mapping in the mapjoin descriptor. + // First check if this map-join operator is already a BucketMapJoin or not. If not give up + // we are only trying to convert a BucketMapJoin to sort-BucketMapJoin. if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) { return false; } - boolean tableSorted = true; + boolean tableEligibleForBucketedSortMergeJoin = true; QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() .get(mapJoinOp); if (joinCxt == null) { @@ -111,8 +113,8 @@ List sortColumnsFirstTable = new ArrayList(); for (int pos = 0; pos < srcs.length; pos++) { - tableSorted = tableSorted - && isTableSorted(smbJoinContext, + tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin + && isEligibleForBucketSortMergeJoin(smbJoinContext, pGraphContext, mapJoinOp.getConf().getKeys().get((byte) pos), joinCxt, @@ -120,7 +122,7 @@ pos, sortColumnsFirstTable); } - if (!tableSorted) { + if (!tableEligibleForBucketedSortMergeJoin) { // this is a mapjoin but not suited for a sort merge bucket map join. check outer joins MapJoinProcessor.checkMapJoin(mapJoinOp.getConf().getPosBigTable(), mapJoinOp.getConf().getConds()); @@ -233,7 +235,7 @@ * @return * @throws SemanticException */ - private boolean isTableSorted( + private boolean isEligibleForBucketSortMergeJoin( SortBucketJoinProcCtx smbJoinContext, ParseContext pctx, List keys, @@ -392,7 +394,7 @@ SortBucketJoinProcCtx smbJoinContext, ParseContext pGraphContext) throws SemanticException { - boolean tableSorted = true; + boolean tableEligibleForBucketedSortMergeJoin = true; QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator); if (joinCtx == null) { @@ -407,8 +409,8 @@ List sortColumnsFirstTable = new ArrayList(); for (int pos = 0; pos < srcs.length; pos++) { - tableSorted = tableSorted && - isTableSorted(smbJoinContext, + tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin && + isEligibleForBucketSortMergeJoin(smbJoinContext, pGraphContext, smbJoinContext.getKeyExprMap().get((byte)pos), joinCtx, Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (revision 1447814) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (working copy) @@ -77,7 +77,7 @@ abstract public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException; - private static List getOnePartitionBucketFileNames( + private static List getBucketFilePathsOfPartition( URI location, ParseContext pGraphContext) throws SemanticException { List fileNames = new ArrayList(); try { @@ -94,32 +94,33 @@ return fileNames; } + // This function checks whether all bucketing columns are also in join keys and are in same order private boolean checkBucketColumns(List bucketColumns, - List keys, - Integer[] orders) { - if (keys == null || bucketColumns == null || bucketColumns.isEmpty()) { + List joinKeys, + Integer[] joinKeyOrders) { + if (joinKeys == null || bucketColumns == null || bucketColumns.isEmpty()) { return false; } - for (int i = 0; i < keys.size(); i++) { - int index = bucketColumns.indexOf(keys.get(i)); - if (orders[i] != null && orders[i] != index) { + for (int i = 0; i < joinKeys.size(); i++) { + int index = bucketColumns.indexOf(joinKeys.get(i)); + if (joinKeyOrders[i] != null && joinKeyOrders[i] != index) { return false; } - orders[i] = index; + joinKeyOrders[i] = index; } // Check if the join columns contains all bucket columns. // If a table is bucketized on column B, but the join key is A and B, // it is easy to see joining on different buckets yield empty results. - return keys.containsAll(bucketColumns); + return joinKeys.containsAll(bucketColumns); } - private boolean checkBucketNumberAgainstBigTable( - Map> aliasToBucketNumber, int bucketNumberInPart) { - for (List bucketNums : aliasToBucketNumber.values()) { + private boolean checkNumberOfBucketsAgainstBigTable( + Map> aliasToNumberOfBuckets, int numberOfBucketsInPartition) { + for (List bucketNums : aliasToNumberOfBuckets.values()) { for (int nxt : bucketNums) { - boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0 - : bucketNumberInPart % nxt == 0; + boolean ok = (nxt >= numberOfBucketsInPartition) ? nxt % numberOfBucketsInPartition == 0 + : numberOfBucketsInPartition % nxt == 0; if (!ok) { return false; } @@ -195,9 +196,9 @@ String baseBigAlias, List joinAliases) throws SemanticException { - LinkedHashMap> aliasToPartitionBucketNumberMapping = + LinkedHashMap> tblAliasToNumberOfBucketsInEachPartition = new LinkedHashMap>(); - LinkedHashMap>> aliasToPartitionBucketFileNamesMapping = + LinkedHashMap>> tblAliasToBucketedFilePathsInEachPartition = new LinkedHashMap>>(); HashMap> topOps = pGraphContext.getTopOps(); @@ -210,11 +211,12 @@ LinkedHashMap bigTblPartsToBucketNumber = new LinkedHashMap(); - Integer[] orders = null; // accessing order of join cols to bucket cols, should be same + Integer[] joinKeyOrder = null; // accessing order of join cols to bucket cols, should be same boolean bigTablePartitioned = true; for (int index = 0; index < joinAliases.size(); index++) { String alias = joinAliases.get(index); Operator topOp = joinCtx.getAliasToOpInfo().get(alias); + // The alias may not be present in case of a sub-query if (topOp == null) { return false; } @@ -225,6 +227,8 @@ int oldKeySize = keys.size(); TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, keys); if (tso == null) { + // We cannot get to root TableScan operator, likely because there is a join or group-by + // between topOp and root TableScan operator. We don't handle that case, so we simply return. return false; } @@ -256,8 +260,8 @@ return false; } - if (orders == null) { - orders = new Integer[keys.size()]; + if (joinKeyOrder == null) { + joinKeyOrder = new Integer[keys.size()]; } Table tbl = topToTable.get(tso); @@ -282,18 +286,18 @@ // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) if (partitions.isEmpty()) { if (!alias.equals(baseBigAlias)) { - aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList()); - aliasToPartitionBucketFileNamesMapping.put(alias, new ArrayList>()); + tblAliasToNumberOfBucketsInEachPartition.put(alias, Arrays.asList()); + tblAliasToBucketedFilePathsInEachPartition.put(alias, new ArrayList>()); } } else { List buckets = new ArrayList(); List> files = new ArrayList>(); for (Partition p : partitions) { - if (!checkBucketColumns(p.getBucketCols(), keys, orders)) { + if (!checkBucketColumns(p.getBucketCols(), keys, joinKeyOrder)) { return false; } List fileNames = - getOnePartitionBucketFileNames(p.getDataLocation(), pGraphContext); + getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext); // The number of files for the table should be same as number of buckets. int bucketCount = p.getBucketCount(); @@ -314,16 +318,16 @@ } } if (!alias.equals(baseBigAlias)) { - aliasToPartitionBucketNumberMapping.put(alias, buckets); - aliasToPartitionBucketFileNamesMapping.put(alias, files); + tblAliasToNumberOfBucketsInEachPartition.put(alias, buckets); + tblAliasToBucketedFilePathsInEachPartition.put(alias, files); } } } else { - if (!checkBucketColumns(tbl.getBucketCols(), keys, orders)) { + if (!checkBucketColumns(tbl.getBucketCols(), keys, joinKeyOrder)) { return false; } List fileNames = - getOnePartitionBucketFileNames(tbl.getDataLocation(), pGraphContext); + getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); Integer num = new Integer(tbl.getNumBuckets()); // The number of files for the table should be same as number of buckets. @@ -340,8 +344,8 @@ bigTblPartsToBucketNumber.put(null, tbl.getNumBuckets()); bigTablePartitioned = false; } else { - aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList(num)); - aliasToPartitionBucketFileNamesMapping.put(alias, Arrays.asList(fileNames)); + tblAliasToNumberOfBucketsInEachPartition.put(alias, Arrays.asList(num)); + tblAliasToBucketedFilePathsInEachPartition.put(alias, Arrays.asList(fileNames)); } } } @@ -350,13 +354,13 @@ // stored in 'bucketNumbers', we need to check if the number of buckets in // the big table can be divided by no of buckets in small tables. for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) { - if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) { + if (!checkNumberOfBucketsAgainstBigTable(tblAliasToNumberOfBucketsInEachPartition, bucketNumber)) { return false; } } - context.setAliasToPartitionBucketNumberMapping(aliasToPartitionBucketNumberMapping); - context.setAliasToPartitionBucketFileNamesMapping(aliasToPartitionBucketFileNamesMapping); + context.setAliasToPartitionBucketNumberMapping(tblAliasToNumberOfBucketsInEachPartition); + context.setAliasToPartitionBucketFileNamesMapping(tblAliasToBucketedFilePathsInEachPartition); context.setBigTblPartsToBucketFileNames(bigTblPartsToBucketFileNames); context.setBigTblPartsToBucketNumber(bigTblPartsToBucketNumber); context.setJoinAliases(joinAliases);