diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 197a20f..82f1313 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2121,6 +2121,8 @@ public void run() { } resultMap.put(pathStr, new ContentSummary(total, -1, -1)); } + // todo: should nullify summary for non-native tables, + // not to be selected as a mapjoin target FileSystem fs = p.getFileSystem(myConf); resultMap.put(pathStr, fs.getContentSummary(p)); } catch (Exception e) { @@ -2182,6 +2184,23 @@ public void run() { } } + // return sum of lengths except one alias. returns -1 if any of other alias is unknown + public static long sumOfExcept(Map aliasToSize, + Set aliases, String except) { + long total = 0; + for (String alias : aliases) { + if (alias.equals(except)) { + continue; + } + Long size = aliasToSize.get(alias); + if (size == null) { + return -1; + } + total += size; + } + return total; + } + public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) throws Exception { ContentSummary cs = ctx.getCS(dirPath); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 2efa7c2..e070863 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -425,7 +425,7 @@ public static boolean cannotConvert(String bigTableAlias, Configuration conf = context.getConf(); - // If sizes of atleast n-1 tables in a n-way join is known, and their sum is smaller than + // If sizes of at least n-1 tables in a n-way join is known, and their sum is smaller than // the threshold size, convert the join into map-join and don't create a conditional task boolean convertJoinMapJoin = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK); @@ -435,47 +435,32 @@ public static boolean cannotConvert(String bigTableAlias, long mapJoinSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - boolean bigTableFound = false; - long largestBigTableCandidateSize = -1; - long sumTableSizes = 0; - for (String alias : aliasToWork.keySet()) { + Long bigTableSize = null; + Set aliases = aliasToWork.keySet(); + for (String alias : aliases) { int tablePosition = getPosition(currWork, joinOp, alias); - boolean bigTableCandidate = bigTableCandidates.contains(tablePosition); - Long size = aliasToSize.get(alias); - // The size is not available at compile time if the input is a sub-query. - // If the size of atleast n-1 inputs for a n-way join are available at compile time, - // and the sum of them is less than the specified threshold, then convert the join - // into a map-join without the conditional task. - if ((size == null) || (size > mapJoinSize)) { - sumTableSizes += largestBigTableCandidateSize; - if (bigTableFound || (sumTableSizes > mapJoinSize) || !bigTableCandidate) { - convertJoinMapJoin = false; - break; - } - bigTableFound = true; + if (!bigTableCandidates.contains(tablePosition)) { + continue; + } + long sumOfOthers = Utilities.sumOfExcept(aliasToSize, aliases, alias); + if (sumOfOthers < 0 || sumOfOthers > mapJoinSize) { + continue; // some small alias is not known or too big + } + if (bigTableSize == null && bigTablePosition >= 0 && tablePosition < bigTablePosition) { + continue; // prefer right most alias + } + Long aliasSize = aliasToSize.get(alias); + if (bigTableSize == null || (aliasSize != null && aliasSize > bigTableSize)) { bigTablePosition = tablePosition; - largestBigTableCandidateSize = mapJoinSize + 1; - } else { - if (bigTableCandidate && size > largestBigTableCandidateSize) { - bigTablePosition = tablePosition; - sumTableSizes += largestBigTableCandidateSize; - largestBigTableCandidateSize = size; - } else { - sumTableSizes += size; - } - if (sumTableSizes > mapJoinSize) { - convertJoinMapJoin = false; - break; - } + bigTableSize = aliasSize; } } } - String bigTableAlias = null; currWork.setOpParseCtxMap(parseCtx.getOpParseCtx()); currWork.setJoinTree(joinTree); - if (convertJoinMapJoin) { + if (bigTablePosition >= 0) { // create map join task and set big table as bigTablePosition MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst(); @@ -505,7 +490,7 @@ public static boolean cannotConvert(String bigTableAlias, // create map join task and set big table as i ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i); MapRedTask newTask = newTaskAlias.getFirst(); - bigTableAlias = newTaskAlias.getSecond(); + String bigTableAlias = newTaskAlias.getSecond(); if (cannotConvert(bigTableAlias, aliasToSize, aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java index faf2f9b..1c02894 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java @@ -19,15 +19,17 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; /** * ConditionalResolverSkewJoin. @@ -135,96 +137,64 @@ public ConditionalResolverCommonJoin() { return resTsks; } - static class AliasFileSizePair implements Comparable { - String alias; - long size; - AliasFileSizePair(String alias, long size) { - super(); - this.alias = alias; - this.size = size; - } - @Override - public int compareTo(AliasFileSizePair o) { - if (o == null) { - return 1; - } - return (size < o.size) ? -1 : ((size > o.size) ? 1 : 0); - } - } - private String resolveMapJoinTask( HashMap> pathToAliases, HashMap> aliasToTask, HashMap aliasToKnownSize, String hdfsTmpDir, String localTmpDir, HiveConf conf) { - String bigTableFileAlias = null; - long smallTablesFileSizeSum = 0; - - Map aliasToFileSizeMap = new HashMap(); - for (Map.Entry entry : aliasToKnownSize.entrySet()) { - String alias = entry.getKey(); - AliasFileSizePair pair = new AliasFileSizePair(alias, entry.getValue()); - aliasToFileSizeMap.put(alias, pair); + Set aliases = new HashSet(); + for (Map.Entry> entry : pathToAliases.entrySet()) { + aliases.addAll(entry.getValue()); } + Set unknownPaths = new HashSet(); + for (Map.Entry> entry : pathToAliases.entrySet()) { + for (String alias : entry.getValue()) { + if (aliases.contains(alias) && !aliasToKnownSize.containsKey(alias)) { + unknownPaths.add(entry.getKey()); + break; + } + } + } try { // need to compute the input size at runtime, and select the biggest as // the big table. - for (Map.Entry> oneEntry : pathToAliases - .entrySet()) { - String p = oneEntry.getKey(); + for (String p : unknownPaths) { // this path is intermediate data if (p.startsWith(hdfsTmpDir) || p.startsWith(localTmpDir)) { - ArrayList aliasArray = oneEntry.getValue(); - if (aliasArray.size() <= 0) { - continue; - } Path path = new Path(p); FileSystem fs = path.getFileSystem(conf); long fileSize = fs.getContentSummary(path).getLength(); - for (String alias : aliasArray) { - AliasFileSizePair pair = aliasToFileSizeMap.get(alias); - if (pair == null) { - pair = new AliasFileSizePair(alias, 0); - aliasToFileSizeMap.put(alias, pair); + for (String alias : pathToAliases.get(p)) { + Long length = aliasToKnownSize.get(alias); + if (length == null) { + aliasToKnownSize.put(alias, fileSize); } - pair.size += fileSize; } } } - // generate file size to alias mapping; but not set file size as key, - // because different file may have the same file size. - - List aliasFileSizeList = new ArrayList( - aliasToFileSizeMap.values()); - - Collections.sort(aliasFileSizeList); - // iterating through this list from the end to beginning, trying to find - // the big table for mapjoin - int idx = aliasFileSizeList.size() - 1; - boolean bigAliasFound = false; - while (idx >= 0) { - AliasFileSizePair pair = aliasFileSizeList.get(idx); - String alias = pair.alias; - long size = pair.size; - idx--; - if (!bigAliasFound && aliasToTask.get(alias) != null) { - // got the big table - bigAliasFound = true; - bigTableFileAlias = alias; - continue; - } - smallTablesFileSizeSum += size; - } - // compare with threshold long threshold = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); - if (smallTablesFileSizeSum <= threshold) { - return bigTableFileAlias; - } else { - return null; + + Long bigTableSize = null; + String bigTableFileAlias = null; + for (String alias : aliases) { + if (!aliasToTask.containsKey(alias)) { + continue; + } + long sumOfOthers = Utilities.sumOfExcept(aliasToKnownSize, aliases, alias); + if (sumOfOthers < 0 || sumOfOthers > threshold) { + continue; + } + // at most one alias is unknown. we can safely regard it as a big alias + Long aliasSize = aliasToKnownSize.get(alias); + if (bigTableSize == null || (aliasSize != null && aliasSize > bigTableSize)) { + bigTableFileAlias = alias; + bigTableSize = aliasSize; + } } + return bigTableFileAlias; } catch (Exception e) { e.printStackTrace(); return null; diff --git ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java deleted file mode 100644 index 67203c9..0000000 --- ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.plan; - -import junit.framework.TestCase; - -import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.AliasFileSizePair; - -public class TestConditionalResolverCommonJoin extends TestCase { - - public void testAliasFileSizePairCompareTo() { - AliasFileSizePair big = new AliasFileSizePair("big", 389560034778L); - AliasFileSizePair small = new AliasFileSizePair("small", 1647L); - - assertEquals(0, big.compareTo(big)); - assertEquals(1, big.compareTo(small)); - assertEquals(-1, small.compareTo(big)); - } -} diff --git ql/src/test/results/clientpositive/auto_join25.q.out ql/src/test/results/clientpositive/auto_join25.q.out index 7427239..d6d0c90 100644 --- ql/src/test/results/clientpositive/auto_join25.q.out +++ ql/src/test/results/clientpositive/auto_join25.q.out @@ -69,7 +69,7 @@ Obtaining error information Task failed! Task ID: - Stage-14 + Stage-15 Logs: @@ -129,7 +129,7 @@ Obtaining error information Task failed! Task ID: - Stage-7 + Stage-8 Logs: diff --git ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out index 7d06739..effa6c8 100644 --- ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out +++ ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out @@ -79,7 +79,7 @@ Obtaining error information Task failed! Task ID: - Stage-7 + Stage-8 Logs: diff --git ql/src/test/results/clientpositive/mapjoin_hook.q.out ql/src/test/results/clientpositive/mapjoin_hook.q.out index d60d16e..8da5168 100644 --- ql/src/test/results/clientpositive/mapjoin_hook.q.out +++ ql/src/test/results/clientpositive/mapjoin_hook.q.out @@ -49,7 +49,7 @@ Obtaining error information Task failed! Task ID: - Stage-14 + Stage-15 Logs: