diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 42f7d88..be04dd3 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2687,6 +2687,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), + TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, + "This is to override the tez setting with the same name"), + TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve-fraction", 0.3f, + "This is to override the tez setting with the same name"), + TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX("hive.tez.task.scale.memory.reserve.fraction.max", + 0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor. " + + "This ratio shouldn't be smaller than 0.3"), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true, diff --git common/src/java/org/apache/hive/common/util/HiveStringUtils.java common/src/java/org/apache/hive/common/util/HiveStringUtils.java index 72c3fa9..7175b49 100644 --- common/src/java/org/apache/hive/common/util/HiveStringUtils.java +++ common/src/java/org/apache/hive/common/util/HiveStringUtils.java @@ -38,6 +38,7 @@ import java.util.Locale; import java.util.Properties; import java.util.StringTokenizer; +import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.collect.Interner; @@ -1015,4 +1016,47 @@ public static String getPartitionValWithInvalidCharacter(List partVals, return null; } + + /** + * Parse a Java opts string, try to find the rightmost -Xmx option value (since there may be more than one) + * @param javaOpts Java opts string to parse + * @return the rightmost -Xmx value in bytes. If Xmx is not set, return -1 + */ + public static long parseRightmostXmx(String javaOpts) { + // Find the last matching -Xmx following word boundaries + // Format: -Xmx[g|G|m|M|k|K] + Pattern JAVA_OPTS_XMX_PATTERN = Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*"); + Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts); + + if (m.matches()) { + long size = Long.parseLong(m.group(1)); + if (size <= 0) { + return -1; + } + + if (m.group(2).isEmpty()) { + // -Xmx specified in bytes + return size; + } + + char unit = m.group(2).charAt(0); + switch (unit) { + case 'k': + case 'K': + // -Xmx specified in KB + return size * 1024; + case 'm': + case 'M': + // -Xmx specified in MB + return size * 1024 * 1024; + case 'g': + case 'G': + // -Xmx speficied in GB + return size * 1024 * 1024 * 1024; + } + } + + // -Xmx not specified + return -1; + } } diff --git common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java index 6bd7037..bf1cd74 100644 --- common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java +++ common/src/test/org/apache/hive/common/util/TestHiveStringUtils.java @@ -61,4 +61,52 @@ public void splitAndUnEscapeTestCase(String testValue, String[] expectedResults) assertTrue(Arrays.toString(expectedResults) + " == " + Arrays.toString(testResults), Arrays.equals(expectedResults, testResults)); } + + @Test + public void testParseRightmostXmx() throws Exception { + // Empty java opts + String javaOpts = ""; + long heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts without -Xmx specified + javaOpts = "-Xms1024m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts with -Xmx specified in GB + javaOpts = "-Xms1024m -Xmx2g"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 2147483648L, heapSize); + + // Non-empty java opts with -Xmx specified in MB + javaOpts = "-Xms1024m -Xmx1024m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1073741824, heapSize); + + // Non-empty java opts with -Xmx specified in KB + javaOpts = "-Xms1024m -Xmx524288k"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 536870912, heapSize); + + // Non-empty java opts with -Xmx specified in B + javaOpts = "-Xms1024m -Xmx1610612736"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1610612736, heapSize); + + // Non-empty java opts with -Xmx specified twice + javaOpts = "-Xmx1024m -Xmx1536m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", 1610612736, heapSize); + + // Non-empty java opts with bad -Xmx specification + javaOpts = "pre-Xmx1024m"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + + // Non-empty java opts with bad -Xmx specification + javaOpts = "-Xmx1024m-post"; + heapSize = HiveStringUtils.parseRightmostXmx(javaOpts); + assertEquals("Unexpected maximum heap size", -1, heapSize); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 690c718..73fb250 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -100,6 +100,8 @@ protected HybridHashTableContainer firstSmallTable; // The first small table; // Only this table has spilled big table rows + private transient double memoryReserveFraction; // Percentage of memory to be reserved from Tez + /** Kryo ctor. */ protected MapJoinOperator() { super(); @@ -766,4 +768,12 @@ protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) { } return skipJoinProcessing; } + + public double getMemoryReserveFraction() { + return memoryReserveFraction; + } + + public void setMemoryReserveFrac(double frac) { + memoryReserveFraction = frac; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index a1e4e6c..6651edc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -454,7 +454,7 @@ public static Resource getContainerResource(Configuration conf) { * Falls back to Map-reduces map java opts if no tez specific options * are set */ - private String getContainerJavaOpts(Configuration conf) { + public static String getContainerJavaOpts(Configuration conf) { String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS); String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index e4b69a5..0c6ec35 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -89,6 +89,7 @@ private static final String CLASS_NAME = TezTask.class.getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); + private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction"; private TezCounters counters; @@ -390,6 +391,11 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work, work.getVertexType(w)); + if (w.getMapJoinMemReserveFrac() != 0.0d) { // indicating there's mapjoin in the work + // Reserve a customized (usually more) percentage of memory for Vertex that has Mapjoin + LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + w.getMapJoinMemReserveFrac()); + wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(w.getMapJoinMemReserveFrac())); + } dag.addVertex(wx); utils.addCredentials(w, dag); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 2b93e01..e6c5b21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -59,9 +60,13 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hive.common.util.HiveStringUtils.parseRightmostXmx; + /** * ConvertJoinMapJoin is an optimization that replaces a common join * (aka shuffle join) with a map join (aka broadcast or fragment replicate @@ -89,6 +94,8 @@ JoinOperator joinOp = (JoinOperator) nd; long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + double memoryReserveFrac = adjustMemoryReserveFraction(maxSize, context.conf); + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { // we are just converting to a common merge join operator. The shuffle @@ -149,6 +156,7 @@ // reduced by 1 mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); + mapJoinOp.setMemoryReserveFrac(memoryReserveFrac); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraits(childOp, mapJoinOp.getOpTraits()); @@ -885,4 +893,44 @@ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContex LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } + + /** + * TODO This method is temporary. Ideally Hive should only need to pass to Tez the amount of memory + * it requires to do the map join, and Tez should take care of figuring out how much to allocate + * Adjust the percentage of memory to be reserved for the processor from Tez + * based on the actual requested memory by the Map Join, i.e. HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD + * @return the adjusted percentage + */ + private double adjustMemoryReserveFraction(long memoryRequested, HiveConf conf) { + float tezHeapFraction = conf.getFloatVar(HiveConf.ConfVars.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION); + float tezReserveFraction = conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION); + + Resource resource = DagUtils.getContainerResource(conf); + long containerSize = (long) resource.getMemory() * 1024 * 1024; + String javaOpts = DagUtils.getContainerJavaOpts(conf); + long xmx = parseRightmostXmx(javaOpts); + + if (xmx <= 0) { + xmx = (long) (tezHeapFraction * containerSize); + } + + long actualMemToBeAllocated = (long) (tezReserveFraction * xmx); + + if (actualMemToBeAllocated < memoryRequested) { + LOG.warn("The actual amount of memory to be allocated " + actualMemToBeAllocated + + " is less than the amount of requested memory for Map Join conversion " + memoryRequested); + float frac = (float) memoryRequested / xmx; + LOG.info("Fraction after calculation: " + frac); + float maxFrac = conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX); + if (frac > tezReserveFraction && frac < maxFrac) { + LOG.info("Will adjust Tez setting " + TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION + + " to " + frac + " to allocate more memory"); + return frac; // larger fraction + } else if (frac >= maxFrac) { + return maxFrac; + } + } + + return tezReserveFraction; // the default fraction + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 461ba37..57f7686 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -244,6 +244,7 @@ public Object process(Node nd, Stack stack, // mapjoin later if (!context.mapJoinWorkMap.containsKey(mj)) { List workItems = new LinkedList(); + work.setMapJoinMemReserveFrac(mj.getMemoryReserveFraction()); workItems.add(work); context.mapJoinWorkMap.put(mj, workItems); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 20f787b..6cd3b24 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -74,6 +74,8 @@ public BaseWork(String name) { protected boolean llapMode = false; protected boolean uberMode = false; + private double mapJoinMemReserveFrac; + public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; } @@ -223,6 +225,14 @@ public boolean getLlapMode() { return llapMode; } + public double getMapJoinMemReserveFrac() { + return mapJoinMemReserveFrac; + } + + public void setMapJoinMemReserveFrac(double frac) { + mapJoinMemReserveFrac = frac; + } + public abstract void configureJobConf(JobConf job); public void setTag(int tag) {