diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fccea89..1f4cb19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -95,7 +95,6 @@ import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -133,12 +132,9 @@ import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; -import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; @@ -184,8 +180,6 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; -import org.apache.commons.codec.binary.Base64; - /** * Utilities. * @@ -2191,6 +2185,8 @@ public void run() { } resultMap.put(pathStr, new ContentSummary(total, -1, -1)); } + // todo: should nullify summary for non-native tables, + // not to be selected as a mapjoin target FileSystem fs = p.getFileSystem(myConf); resultMap.put(pathStr, fs.getContentSummary(p)); } catch (Exception e) { @@ -2252,6 +2248,23 @@ public void run() { } } + // return sum of lengths except one alias. returns -1 if any of other alias is unknown + public static long sumOfExcept(Map aliasToSize, + Set aliases, String except) { + long total = 0; + for (String alias : aliases) { + if (alias.equals(except)) { + continue; + } + Long size = aliasToSize.get(alias); + if (size == null) { + return -1; + } + total += size; + } + return total; + } + public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) throws Exception { ContentSummary cs = ctx.getCS(dirPath); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index efa9768..74ca355 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -441,7 +441,7 @@ public static boolean cannotConvert(String bigTableAlias, Configuration conf = context.getConf(); - // If sizes of atleast n-1 tables in a n-way join is known, and their sum is smaller than + // If sizes of at least n-1 tables in a n-way join is known, and their sum is smaller than // the threshold size, convert the join into map-join and don't create a conditional task boolean convertJoinMapJoin = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK); @@ -451,47 +451,32 @@ public static boolean cannotConvert(String bigTableAlias, long mapJoinSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - boolean bigTableFound = false; - long largestBigTableCandidateSize = -1; - long sumTableSizes = 0; - for (String alias : aliasToWork.keySet()) { + Long bigTableSize = null; + Set aliases = aliasToWork.keySet(); + for (String alias : aliases) { int tablePosition = getPosition(currWork, joinOp, alias); - boolean bigTableCandidate = bigTableCandidates.contains(tablePosition); - Long size = aliasToSize.get(alias); - // The size is not available at compile time if the input is a sub-query. - // If the size of atleast n-1 inputs for a n-way join are available at compile time, - // and the sum of them is less than the specified threshold, then convert the join - // into a map-join without the conditional task. - if ((size == null) || (size > mapJoinSize)) { - sumTableSizes += largestBigTableCandidateSize; - if (bigTableFound || (sumTableSizes > mapJoinSize) || !bigTableCandidate) { - convertJoinMapJoin = false; - break; - } - bigTableFound = true; + if (!bigTableCandidates.contains(tablePosition)) { + continue; + } + long sumOfOthers = Utilities.sumOfExcept(aliasToSize, aliases, alias); + if (sumOfOthers < 0 || sumOfOthers > mapJoinSize) { + continue; // some small alias is not known or too big + } + if (bigTableSize == null && bigTablePosition >= 0 && tablePosition < bigTablePosition) { + continue; // prefer right most alias + } + Long aliasSize = aliasToSize.get(alias); + if (bigTableSize == null || (aliasSize != null && aliasSize > bigTableSize)) { bigTablePosition = tablePosition; - largestBigTableCandidateSize = mapJoinSize + 1; - } else { - if (bigTableCandidate && size > largestBigTableCandidateSize) { - bigTablePosition = tablePosition; - sumTableSizes += largestBigTableCandidateSize; - largestBigTableCandidateSize = size; - } else { - sumTableSizes += size; - } - if (sumTableSizes > mapJoinSize) { - convertJoinMapJoin = false; - break; - } + bigTableSize = aliasSize; } } } - String bigTableAlias = null; currWork.setOpParseCtxMap(parseCtx.getOpParseCtx()); currWork.setJoinTree(joinTree); - if (convertJoinMapJoin) { + if (bigTablePosition >= 0) { // create map join task and set big table as bigTablePosition MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst(); @@ -521,7 +506,7 @@ public static boolean cannotConvert(String bigTableAlias, // create map join task and set big table as i ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i); MapRedTask newTask = newTaskAlias.getFirst(); - bigTableAlias = newTaskAlias.getSecond(); + String bigTableAlias = newTaskAlias.getSecond(); if (cannotConvert(bigTableAlias, aliasToSize, aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java index f75e366..ebccb14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java @@ -19,22 +19,28 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; /** * ConditionalResolverSkewJoin. * */ public class ConditionalResolverCommonJoin implements ConditionalResolver, Serializable { + private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(ConditionalResolverCommonJoin.class); /** * ConditionalResolverSkewJoinCtx. @@ -71,7 +77,8 @@ public void setCommonJoinTask(Task commonJoinTask) { } public HashMap getAliasToKnownSize() { - return aliasToKnownSize == null ? new HashMap() : aliasToKnownSize; + return aliasToKnownSize == null ? + aliasToKnownSize = new HashMap() : aliasToKnownSize; } public void setAliasToKnownSize(HashMap aliasToKnownSize) { @@ -101,6 +108,20 @@ public Path getHdfsTmpDir() { public void setHdfsTmpDir(Path hdfsTmpDir) { this.hdfsTmpDir = hdfsTmpDir; } + + @Override + public ConditionalResolverCommonJoinCtx clone() { + ConditionalResolverCommonJoinCtx ctx = new ConditionalResolverCommonJoinCtx(); + ctx.setAliasToTask(aliasToTask); + ctx.setCommonJoinTask(commonJoinTask); + ctx.setPathToAliases(pathToAliases); + ctx.setHdfsTmpDir(hdfsTmpDir); + ctx.setLocalTmpDir(localTmpDir); + // if any of join participants is from other MR, it has alias like '[pos:]$INTNAME' + // which of size should be caculated for each resolver. + ctx.setAliasToKnownSize(new HashMap(aliasToKnownSize)); + return ctx; + } } public ConditionalResolverCommonJoin() { @@ -108,15 +129,11 @@ public ConditionalResolverCommonJoin() { @Override public List> getTasks(HiveConf conf, Object objCtx) { - ConditionalResolverCommonJoinCtx ctx = (ConditionalResolverCommonJoinCtx) objCtx; + ConditionalResolverCommonJoinCtx ctx = ((ConditionalResolverCommonJoinCtx) objCtx).clone(); List> resTsks = new ArrayList>(); // get aliasToPath and pass it to the heuristic - HashMap> pathToAliases = ctx.getPathToAliases(); - HashMap aliasToKnownSize = ctx.getAliasToKnownSize(); - String bigTableAlias = this.resolveMapJoinTask(pathToAliases, ctx - .getAliasToTask(), aliasToKnownSize, ctx.getHdfsTmpDir(), ctx - .getLocalTmpDir(), conf); + String bigTableAlias = resolveDriverAlias(ctx, conf); if (bigTableAlias == null) { // run common join task @@ -135,99 +152,98 @@ public ConditionalResolverCommonJoin() { return resTsks; } - static class AliasFileSizePair implements Comparable { - String alias; - long size; - AliasFileSizePair(String alias, long size) { - super(); - this.alias = alias; - this.size = size; - } - @Override - public int compareTo(AliasFileSizePair o) { - if (o == null) { - return 1; - } - return (size < o.size) ? -1 : ((size > o.size) ? 1 : 0); + private String resolveDriverAlias(ConditionalResolverCommonJoinCtx ctx, HiveConf conf) { + try { + resolveUnknownSizes(ctx, conf); + return resolveMapJoinTask(ctx, conf); + } catch (Exception e) { + LOG.info("Failed to resolve driver alias by exception.. Falling back to common join", e); } + return null; } - private String resolveMapJoinTask( - HashMap> pathToAliases, - HashMap> aliasToTask, - HashMap aliasToKnownSize, Path hdfsTmpDir, - Path localTmpDir, HiveConf conf) { + protected String resolveMapJoinTask( + ConditionalResolverCommonJoinCtx ctx, HiveConf conf) throws Exception { + + Set aliases = getParticipants(ctx); + + Map aliasToKnownSize = ctx.getAliasToKnownSize(); + Map> pathToAliases = ctx.getPathToAliases(); + Map> aliasToTask = ctx.getAliasToTask(); + + long threshold = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); + Long bigTableSize = null; + Long smallTablesSize = null; String bigTableFileAlias = null; - long smallTablesFileSizeSum = 0; + for (String alias : aliases) { + if (!aliasToTask.containsKey(alias)) { + continue; + } + long sumOfOthers = Utilities.sumOfExcept(aliasToKnownSize, aliases, alias); + if (sumOfOthers < 0 || sumOfOthers > threshold) { + continue; + } + // at most one alias is unknown. we can safely regard it as a big alias + Long aliasSize = aliasToKnownSize.get(alias); + if (bigTableSize == null || (aliasSize != null && aliasSize > bigTableSize)) { + bigTableFileAlias = alias; + bigTableSize = aliasSize; + smallTablesSize = sumOfOthers; + } + } + if (bigTableFileAlias != null) { + LOG.info("Driver alias is " + bigTableFileAlias + " with size " + bigTableSize + + " (total size of others : " + smallTablesSize + ", threshold : " + threshold + ")"); + return bigTableFileAlias; + } + LOG.info("Failed to resolve driver alias (threshold : " + threshold + + ", length mapping : " + aliasToKnownSize + ")"); + return null; + } - Map aliasToFileSizeMap = new HashMap(); - for (Map.Entry entry : aliasToKnownSize.entrySet()) { - String alias = entry.getKey(); - AliasFileSizePair pair = new AliasFileSizePair(alias, entry.getValue()); - aliasToFileSizeMap.put(alias, pair); + private Set getParticipants(ConditionalResolverCommonJoinCtx ctx) { + Set participants = new HashSet(); + for (List aliases : ctx.getPathToAliases().values()) { + participants.addAll(aliases); } + return participants; + } - try { - // need to compute the input size at runtime, and select the biggest as - // the big table. - for (Map.Entry> oneEntry : pathToAliases - .entrySet()) { - String p = oneEntry.getKey(); - // this path is intermediate data - if (p.startsWith(hdfsTmpDir.toString()) || p.startsWith(localTmpDir.toString())) { - ArrayList aliasArray = oneEntry.getValue(); - if (aliasArray.size() <= 0) { - continue; - } - Path path = new Path(p); - FileSystem fs = path.getFileSystem(conf); - long fileSize = fs.getContentSummary(path).getLength(); - for (String alias : aliasArray) { - AliasFileSizePair pair = aliasToFileSizeMap.get(alias); - if (pair == null) { - pair = new AliasFileSizePair(alias, 0); - aliasToFileSizeMap.put(alias, pair); - } - pair.size += fileSize; - } + protected void resolveUnknownSizes(ConditionalResolverCommonJoinCtx ctx, HiveConf conf) + throws Exception { + + Set aliases = getParticipants(ctx); + + Map aliasToKnownSize = ctx.getAliasToKnownSize(); + Map> pathToAliases = ctx.getPathToAliases(); + + Set unknownPaths = new HashSet(); + for (Map.Entry> entry : pathToAliases.entrySet()) { + for (String alias : entry.getValue()) { + if (aliases.contains(alias) && !aliasToKnownSize.containsKey(alias)) { + unknownPaths.add(entry.getKey()); + break; } } - // generate file size to alias mapping; but not set file size as key, - // because different file may have the same file size. - - List aliasFileSizeList = new ArrayList( - aliasToFileSizeMap.values()); - - Collections.sort(aliasFileSizeList); - // iterating through this list from the end to beginning, trying to find - // the big table for mapjoin - int idx = aliasFileSizeList.size() - 1; - boolean bigAliasFound = false; - while (idx >= 0) { - AliasFileSizePair pair = aliasFileSizeList.get(idx); - String alias = pair.alias; - long size = pair.size; - idx--; - if (!bigAliasFound && aliasToTask.get(alias) != null) { - // got the big table - bigAliasFound = true; - bigTableFileAlias = alias; - continue; + } + Path hdfsTmpDir = ctx.getHdfsTmpDir(); + Path localTmpDir = ctx.getLocalTmpDir(); + // need to compute the input size at runtime, and select the biggest as + // the big table. + for (String p : unknownPaths) { + // this path is intermediate data + if (p.startsWith(hdfsTmpDir.toString()) || p.startsWith(localTmpDir.toString())) { + Path path = new Path(p); + FileSystem fs = path.getFileSystem(conf); + long fileSize = fs.getContentSummary(path).getLength(); + for (String alias : pathToAliases.get(p)) { + Long length = aliasToKnownSize.get(alias); + if (length == null) { + aliasToKnownSize.put(alias, fileSize); + } } - smallTablesFileSizeSum += size; } - - // compare with threshold - long threshold = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); - if (smallTablesFileSizeSum <= threshold) { - return bigTableFileAlias; - } else { - return null; - } - } catch (Exception e) { - e.printStackTrace(); - return null; } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java index 67203c9..c9b0cf0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java +++ ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java @@ -18,18 +18,60 @@ package org.apache.hadoop.hive.ql.plan; -import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.junit.Test; -import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.AliasFileSizePair; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; -public class TestConditionalResolverCommonJoin extends TestCase { +public class TestConditionalResolverCommonJoin { - public void testAliasFileSizePairCompareTo() { - AliasFileSizePair big = new AliasFileSizePair("big", 389560034778L); - AliasFileSizePair small = new AliasFileSizePair("small", 1647L); + @Test + public void testResolvingDriverAlias() throws Exception { + ConditionalResolverCommonJoin resolver = new ConditionalResolverCommonJoin(); - assertEquals(0, big.compareTo(big)); - assertEquals(1, big.compareTo(small)); - assertEquals(-1, small.compareTo(big)); - } -} + HashMap> pathToAliases = new HashMap>(); + pathToAliases.put("path1", new ArrayList(Arrays.asList("alias1", "alias2"))); + pathToAliases.put("path2", new ArrayList(Arrays.asList("alias3"))); + + HashMap aliasToKnownSize = new HashMap(); + aliasToKnownSize.put("alias1", 1024l); + aliasToKnownSize.put("alias2", 2048l); + aliasToKnownSize.put("alias3", 4096l); + + // joins alias1, alias2, alias3 (alias1 was not eligible for big pos) + HashMap> aliasToTask = + new HashMap>(); + aliasToTask.put("alias2", null); + aliasToTask.put("alias3", null); + + ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx ctx = + new ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx(); + ctx.setPathToAliases(pathToAliases); + ctx.setAliasToTask(aliasToTask); + ctx.setAliasToKnownSize(aliasToKnownSize); + + HiveConf conf = new HiveConf(); + conf.setLongVar(HiveConf.ConfVars.HIVESMALLTABLESFILESIZE, 4096); + + // alias3 only can be selected + String resolved = resolver.resolveMapJoinTask(ctx, conf); + Assert.assertEquals("alias3", resolved); + + conf.setLongVar(HiveConf.ConfVars.HIVESMALLTABLESFILESIZE, 65536); + + // alias1, alias2, alias3 all can be selected but overriden by biggest one (alias3) + resolved = resolver.resolveMapJoinTask(ctx, conf); + Assert.assertEquals("alias3", resolved); + + conf.setLongVar(HiveConf.ConfVars.HIVESMALLTABLESFILESIZE, 2048); + + // not selected + resolved = resolver.resolveMapJoinTask(ctx, conf); + Assert.assertNull(resolved); + } +} \ No newline at end of file diff --git ql/src/test/results/clientpositive/auto_join25.q.out ql/src/test/results/clientpositive/auto_join25.q.out index 7427239..d6d0c90 100644 --- ql/src/test/results/clientpositive/auto_join25.q.out +++ ql/src/test/results/clientpositive/auto_join25.q.out @@ -69,7 +69,7 @@ Obtaining error information Task failed! Task ID: - Stage-14 + Stage-15 Logs: @@ -129,7 +129,7 @@ Obtaining error information Task failed! Task ID: - Stage-7 + Stage-8 Logs: diff --git ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out index 7d06739..effa6c8 100644 --- ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out +++ ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out @@ -79,7 +79,7 @@ Obtaining error information Task failed! Task ID: - Stage-7 + Stage-8 Logs: diff --git ql/src/test/results/clientpositive/mapjoin_hook.q.out ql/src/test/results/clientpositive/mapjoin_hook.q.out index d60d16e..8da5168 100644 --- ql/src/test/results/clientpositive/mapjoin_hook.q.out +++ ql/src/test/results/clientpositive/mapjoin_hook.q.out @@ -49,7 +49,7 @@ Obtaining error information Task failed! Task ID: - Stage-14 + Stage-15 Logs: