diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 66203a5..7460191 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2688,6 +2688,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), + TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, + "This is to override the tez setting with the same name"), + TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN("hive.tez.task.scale.memory.reserve-fraction.min", + 0.3f, "This is to override the tez setting tez.task.scale.memory.reserve-fraction"), + TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX("hive.tez.task.scale.memory.reserve.fraction.max", + 0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"), + TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction", + 0.0f, "The customized fraction of JVM memory which Tez will reserve for the processor"), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 60b5c40..aacb0ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -38,6 +38,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; @@ -454,7 +456,7 @@ public static Resource getContainerResource(Configuration conf) { * Falls back to Map-reduces map java opts if no tez specific options * are set */ - private String getContainerJavaOpts(Configuration conf) { + private static String getContainerJavaOpts(Configuration conf) { String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS); String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL); @@ -1276,4 +1278,94 @@ public static String getUserSpecifiedDagName(Configuration conf) { private DagUtils() { // don't instantiate } + + /** + * TODO This method is temporary. Ideally Hive should only need to pass to Tez the amount of memory + * it requires to do the map join, and Tez should take care of figuring out how much to allocate + * Adjust the percentage of memory to be reserved for the processor from Tez + * based on the actual requested memory by the Map Join, i.e. HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD + * @return the adjusted percentage + */ + public static double adjustMemoryReserveFraction(long memoryRequested, HiveConf conf) { + // User specified fraction always takes precedence + if (conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION) != 0.0f) { + return conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION); + } + + float tezHeapFraction = conf.getFloatVar(ConfVars.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION); + float tezMinReserveFraction = conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN); + float tezMaxReserveFraction = conf.getFloatVar(ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX); + + Resource resource = getContainerResource(conf); + long containerSize = (long) resource.getMemory() * 1024 * 1024; + String javaOpts = getContainerJavaOpts(conf); + long xmx = parseRightmostXmx(javaOpts); + + if (xmx <= 0) { + xmx = (long) (tezHeapFraction * containerSize); + } + + long actualMemToBeAllocated = (long) (tezMinReserveFraction * xmx); + + if (actualMemToBeAllocated < memoryRequested) { + LOG.warn("The actual amount of memory to be allocated " + actualMemToBeAllocated + + " is less than the amount of requested memory for Map Join conversion " + memoryRequested); + float frac = (float) memoryRequested / xmx; + LOG.info("Fraction after calculation: " + frac); + if (frac <= tezMinReserveFraction) { + return tezMinReserveFraction; + } else if (frac > tezMinReserveFraction && frac < tezMaxReserveFraction) { + LOG.info("Will adjust Tez setting " + TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION + + " to " + frac + " to allocate more memory"); + return frac; + } else { // frac >= tezMaxReserveFraction + return tezMaxReserveFraction; + } + } + + return tezMinReserveFraction; // the default fraction + } + + /** + * Parse a Java opts string, try to find the rightmost -Xmx option value (since there may be more than one) + * @param javaOpts Java opts string to parse + * @return the rightmost -Xmx value in bytes. If Xmx is not set, return -1 + */ + static long parseRightmostXmx(String javaOpts) { + // Find the last matching -Xmx following word boundaries + // Format: -Xmx[g|G|m|M|k|K] + Pattern JAVA_OPTS_XMX_PATTERN = Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*"); + Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts); + + if (m.matches()) { + long size = Long.parseLong(m.group(1)); + if (size <= 0) { + return -1; + } + + if (m.group(2).isEmpty()) { + // -Xmx specified in bytes + return size; + } + + char unit = m.group(2).charAt(0); + switch (unit) { + case 'k': + case 'K': + // -Xmx specified in KB + return size * 1024; + case 'm': + case 'M': + // -Xmx specified in MB + return size * 1024 * 1024; + case 'g': + case 'G': + // -Xmx speficied in GB + return size * 1024 * 1024 * 1024; + } + } + + // -Xmx not specified + return -1; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index e4b69a5..25c4514 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -89,6 +89,7 @@ private static final String CLASS_NAME = TezTask.class.getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); + private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction"; private TezCounters counters; @@ -390,6 +391,12 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work, work.getVertexType(w)); + if (w.getReservedMemoryMB() > 0) { + // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed + double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf); + LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac); + wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac)); + } // Otherwise just leave it up to Tez to decide how much memory to allocate dag.addVertex(wx); utils.addCredentials(w, dag); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index cf7a875..66a8322 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -380,6 +380,15 @@ protected void generateTaskTree(List> rootTasks, Pa GraphWalker ogw = new GenTezWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); + // we need to specify the reserved memory for each work that contains Map Join + for (List baseWorkList : procCtx.mapJoinWorkMap.values()) { + for (BaseWork w : baseWorkList) { + // work should be the smallest unit for memory allocation + w.setReservedMemoryMB( + (int)(conf.getLongVar(ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD) / (1024 * 1024))); + } + } + // we need to clone some operator plans and remove union operators still for (BaseWork w: procCtx.workWithUnionOperators) { GenTezUtils.removeUnionOperators(procCtx, w); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 20f787b..13a0811 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -74,6 +74,8 @@ public BaseWork(String name) { protected boolean llapMode = false; protected boolean uberMode = false; + private int reservedMemoryMB = -1; // default to -1 means we leave it up to Tez to decide + public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; } @@ -223,6 +225,14 @@ public boolean getLlapMode() { return llapMode; } + public int getReservedMemoryMB() { + return reservedMemoryMB; + } + + public void setReservedMemoryMB(int memoryMB) { + reservedMemoryMB = memoryMB; + } + public abstract void configureJobConf(JobConf job); public void setTag(int tag) { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index e04ad7a..53672a9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -278,4 +278,52 @@ public void testGetExtraLocalResources() throws Exception { assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars)); } + + @Test + public void testParseRightmostXmx() throws Exception { + // Empty java opts + String javaOpts = ""; + long heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts without -Xmx specified + javaOpts = "-Xms1024m"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts with -Xmx specified in GB + javaOpts = "-Xms1024m -Xmx2g"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 2147483648L, heapSize); + + // Non-empty java opts with -Xmx specified in MB + javaOpts = "-Xms1024m -Xmx1024m"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1073741824, heapSize); + + // Non-empty java opts with -Xmx specified in KB + javaOpts = "-Xms1024m -Xmx524288k"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 536870912, heapSize); + + // Non-empty java opts with -Xmx specified in B + javaOpts = "-Xms1024m -Xmx1610612736"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1610612736, heapSize); + + // Non-empty java opts with -Xmx specified twice + javaOpts = "-Xmx1024m -Xmx1536m"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1610612736, heapSize); + + // Non-empty java opts with bad -Xmx specification + javaOpts = "pre-Xmx1024m"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts with bad -Xmx specification + javaOpts = "-Xmx1024m-post"; + heapSize = DagUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + } }