diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1dee6fb..f9b0de0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2684,6 +2684,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), + TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX("hive.tez.task.scale.memory.reserve.fraction.max", + 0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor. " + + "This ratio shouldn't be smaller than 0.3"), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true, diff --git common/src/java/org/apache/hive/common/util/HiveStringUtils.java common/src/java/org/apache/hive/common/util/HiveStringUtils.java index 72c3fa9..7175b49 100644 --- common/src/java/org/apache/hive/common/util/HiveStringUtils.java +++ common/src/java/org/apache/hive/common/util/HiveStringUtils.java @@ -38,6 +38,7 @@ import java.util.Locale; import java.util.Properties; import java.util.StringTokenizer; +import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.collect.Interner; @@ -1015,4 +1016,47 @@ public static String getPartitionValWithInvalidCharacter(List partVals, return null; } + + /** + * Parse a Java opts string, try to find the rightmost -Xmx option value (since there may be more than one) + * @param javaOpts Java opts string to parse + * @return the rightmost -Xmx value in bytes. If Xmx is not set, return -1 + */ + public static long parseRightmostXmx(String javaOpts) { + // Find the last matching -Xmx following word boundaries + // Format: -Xmx[g|G|m|M|k|K] + Pattern JAVA_OPTS_XMX_PATTERN = Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*"); + Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts); + + if (m.matches()) { + long size = Long.parseLong(m.group(1)); + if (size <= 0) { + return -1; + } + + if (m.group(2).isEmpty()) { + // -Xmx specified in bytes + return size; + } + + char unit = m.group(2).charAt(0); + switch (unit) { + case 'k': + case 'K': + // -Xmx specified in KB + return size * 1024; + case 'm': + case 'M': + // -Xmx specified in MB + return size * 1024 * 1024; + case 'g': + case 'G': + // -Xmx speficied in GB + return size * 1024 * 1024 * 1024; + } + } + + // -Xmx not specified + return -1; + } } diff --git common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java index 6bd7037..bf1cd74 100644 --- common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java +++ common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java @@ -61,4 +61,52 @@ public void splitAndUnEscapeTestCase(String testValue, String[] expectedResults) assertTrue(Arrays.toString(expectedResults) + " == " + Arrays.toString(testResults), Arrays.equals(expectedResults, testResults)); } + + @Test + public void testParseRightmostXmx() throws Exception { + // Empty java opts + String javaOpts = ""; + long heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts without -Xmx specified + javaOpts = "-Xms1024m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts with -Xmx specified in GB + javaOpts = "-Xms1024m -Xmx2g"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 2147483648L, heapSize); + + // Non-empty java opts with -Xmx specified in MB + javaOpts = "-Xms1024m -Xmx1024m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1073741824, heapSize); + + // Non-empty java opts with -Xmx specified in KB + javaOpts = "-Xms1024m -Xmx524288k"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 536870912, heapSize); + + // Non-empty java opts with -Xmx specified in B + javaOpts = "-Xms1024m -Xmx1610612736"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1610612736, heapSize); + + // Non-empty java opts with -Xmx specified twice + javaOpts = "-Xmx1024m -Xmx1536m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1610612736, heapSize); + + // Non-empty java opts with bad -Xmx specification + javaOpts = "pre-Xmx1024m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts with bad -Xmx specification + javaOpts = "-Xmx1024m-post"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index ff292a3..1e4ee9c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -97,6 +97,8 @@ protected HybridHashTableContainer firstSmallTable; // The first small table; // Only this table has spilled big table rows + private transient double memoryReserveFraction; // Percentage of memory to be reserved from Tez + /** Kryo ctor. */ protected MapJoinOperator() { super(); @@ -752,4 +754,12 @@ protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) { } return skipJoinProcessing; } + + public double getMemoryReserveFraction() { + return memoryReserveFraction; + } + + public void setMemoryReserveFrac(double frac) { + memoryReserveFraction = frac; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 9e114c0..739c634 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -89,6 +89,7 @@ private static final String CLASS_NAME = TezTask.class.getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); + private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction"; private TezCounters counters; @@ -385,6 +386,11 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work, work.getVertexType(w)); + if (w.getMapJoinMemReserveFrac() != 0) { // indicating there's mapjoin in the work + // Reserve a customized (usually more) percentage of memory for Vertex that has Mapjoin + LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + w.getMapJoinMemReserveFrac()); + wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(w.getMapJoinMemReserveFrac())); + } dag.addVertex(wx); utils.addCredentials(w, dag); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 2b93e01..894fc6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -59,9 +59,12 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hive.common.util.HiveStringUtils.parseRightmostXmx; + /** * ConvertJoinMapJoin is an optimization that replaces a common join * (aka shuffle join) with a map join (aka broadcast or fragment replicate @@ -89,6 +92,8 @@ JoinOperator joinOp = (JoinOperator) nd; long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + double memoryReserveFrac = adjustMemoryReserveFraction(maxSize, context.conf); + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { // we are just converting to a common merge join operator. The shuffle @@ -149,6 +154,7 @@ // reduced by 1 mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); + mapJoinOp.setMemoryReserveFrac(memoryReserveFrac); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraits(childOp, mapJoinOp.getOpTraits()); @@ -885,4 +891,59 @@ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContex LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } + + /** + * Adjust the percentage of memory to be reserved for the processor from Tez + * based on the actual requested memory by the Map Join, i.e. HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD + * @return the adjusted percentage + */ + private double adjustMemoryReserveFraction(long memoryRequested, HiveConf conf) { + long actualMemToBeAllocated = -1; + long containerSize = (long) conf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) * 1024 * 1024; + assert containerSize > 0; + + double tezHeapFraction = conf.getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION, + TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT); + double tezReserveFraction = conf.getDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, + TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_DEFAULT); + + /* Find the rightmost -Xmx param among multiple Java options. Why rightmost? + $ java -Xmx1G -XX:+PrintFlagsFinal -Xmx2G 2>/dev/null | grep MaxHeapSize + uintx MaxHeapSize := 2147483648 {product} + */ + String hiveTezOpts = conf.getVar(HiveConf.ConfVars.HIVETEZJAVAOPTS); + String tezClusterDefaultOpts = conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS, + TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT); + String tezOpts = conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, + TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT); + + // Merge the above three Java Opts strings into one + String javaOpts = hiveTezOpts + " " + tezClusterDefaultOpts + " " + tezOpts; + long xmx = parseRightmostXmx(javaOpts); + + if (xmx <= 0) { + actualMemToBeAllocated = (long) (tezHeapFraction * containerSize); + } else { + actualMemToBeAllocated = (long) (tezReserveFraction * xmx); + } + + if (actualMemToBeAllocated < memoryRequested) { + LOG.warn("The actual amount of memory to be allocated " + actualMemToBeAllocated + + " is less than the amount of requested memory for Map Join conversion " + memoryRequested); + double frac; + if (xmx <= 0) { + frac = (double) memoryRequested / containerSize; + } else { + frac = (double) memoryRequested / xmx; + } + LOG.info("Fraction after calculation: " + frac); + if (frac > tezReserveFraction && frac <= conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX)) { + LOG.info("Will adjust Tez setting " + TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION + + " to " + frac + " to allocate more memory"); + return frac; // larger fraction + } + } + + return tezReserveFraction; // the default fraction + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 461ba37..57f7686 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -244,6 +244,7 @@ public Object process(Node nd, Stack stack, // mapjoin later if (!context.mapJoinWorkMap.containsKey(mj)) { List workItems = new LinkedList(); + work.setMapJoinMemReserveFrac(mj.getMemoryReserveFraction()); workItems.add(work); context.mapJoinWorkMap.put(mj, workItems); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 20f787b..6cd3b24 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -74,6 +74,8 @@ public BaseWork(String name) { protected boolean llapMode = false; protected boolean uberMode = false; + private double mapJoinMemReserveFrac; + public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; } @@ -223,6 +225,14 @@ public boolean getLlapMode() { return llapMode; } + public double getMapJoinMemReserveFrac() { + return mapJoinMemReserveFrac; + } + + public void setMapJoinMemReserveFrac(double frac) { + mapJoinMemReserveFrac = frac; + } + public abstract void configureJobConf(JobConf job); public void setTag(int tag) {