diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5ae5465..a25b498 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -859,9 +859,19 @@ public class YarnConfiguration extends Configuration { /** Specifies whether physical memory check is enabled. */ public static final String NM_VMEM_CHECK_ENABLED = NM_PREFIX - + "vmem-check-enabled"; + + "vmem-check-enabled"; public static final boolean DEFAULT_NM_VMEM_CHECK_ENABLED = true; + /** Specifies whether thread count check is enabled. */ + public static final String NM_THREAD_COUNT_CHECK_ENABLED = NM_PREFIX + + "thread-count-check-enabled"; + public static final boolean DEFAULT_NM_THREAD_COUNT_CHECK_ENABLED = false; + + /** Default thread count upper limit. */ + public static final String NM_CONTAINER_THREAD_UPPER_LIMIT = NM_PREFIX + + "container-thread-upper-limit"; + public static final int DEFAULT_NM_CONTAINER_THREAD_UPPER_LIMIT = 3500; + /** Conversion ratio for physical memory to virtual memory. */ public static final String NM_VMEM_PMEM_RATIO = NM_PREFIX + "vmem-pmem-ratio"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java index df9d28a..5fc6064 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java @@ -26,11 +26,7 @@ import java.io.InputStreamReader; import java.io.IOException; import java.math.BigInteger; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -145,6 +141,11 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree { protected Map processTree = new HashMap(); + // for get all pids + public Set getAllPids() { + return processTree.keySet(); + } + public ProcfsBasedProcessTree(String pid) { this(pid, PROCFS, new SystemClock()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 20d2112..f6e2e85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,11 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.*; import java.util.Map.Entry; import org.apache.commons.logging.Log; @@ -39,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; @@ -70,9 +70,11 @@ public class ContainersMonitorImpl extends AbstractService implements private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT; private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT; + private int containerThreadUpperLimit; private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; + private boolean threadCountCheckEnabled; private long maxVCoresAllottedForContainers; @@ -148,8 +150,18 @@ public class ContainersMonitorImpl extends AbstractService implements YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED); vmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED); + threadCountCheckEnabled = conf.getBoolean(YarnConfiguration.NM_THREAD_COUNT_CHECK_ENABLED, + YarnConfiguration.DEFAULT_NM_THREAD_COUNT_CHECK_ENABLED); + LOG.info("Physical memory check enabled: " + pmemCheckEnabled); LOG.info("Virtual memory check enabled: " + vmemCheckEnabled); + LOG.info("Thread count check enabled: " + threadCountCheckEnabled); + + if (threadCountCheckEnabled) { + this.containerThreadUpperLimit = conf.getInt(YarnConfiguration.NM_CONTAINER_THREAD_UPPER_LIMIT, + YarnConfiguration.DEFAULT_NM_CONTAINER_THREAD_UPPER_LIMIT); + LOG.info("Container thread count upper limit to: " + containerThreadUpperLimit); + } nodeCpuPercentageForYARN = NodeManagerHardwareUtils.getNodeCpuPercentage(conf); @@ -532,6 +544,32 @@ public class ContainersMonitorImpl extends AbstractService implements vmemStillInUsage += currentVmemUsage; pmemStillInUsage += currentPmemUsage; } + + // threads count limit check + if (threadCountCheckEnabled) { + if (pTree instanceof ProcfsBasedProcessTree) { + ProcfsBasedProcessTree procfsBasedProcessTree = (ProcfsBasedProcessTree) pTree; + // get all pids + Set pids = procfsBasedProcessTree.getAllPids(); + int tCount = 0; + for (String pid : pids) { + tCount += getContainerThreadCount(pid); + } + LOG.info("Thread count of the container [id=" + containerId + ", pids=" + pids + "] is " + tCount); + if (tCount >= containerThreadUpperLimit) { + // exceed + String message = "Container [pid=" + pId + ",containerID=" + containerId + + " is running beyond thread count limit, current is " + tCount + ", limit is " + containerThreadUpperLimit; + LOG.warn(message); + // kill the container + eventDispatcher.getEventHandler().handle( + new ContainerKillEvent(containerId, + containerExitStatus, message)); + it.remove(); + } + } + } + } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainerMemoryManager " @@ -549,6 +587,38 @@ public class ContainersMonitorImpl extends AbstractService implements } } + private int getContainerThreadCount(String pid) { + int threads = -1; + BufferedReader br = null; + try { + String line = null; + File file = new File("/proc/" + pid + "/status"); + if(!file.exists()) { + return 0; + } + br = new BufferedReader(new FileReader(file)); + while ((line = br.readLine()) != null) { + if (line.startsWith("Threads:")) { + // found + threads = Integer.parseInt(line.split(":")[1].trim()); + break; + } + } + } catch (Exception e) { + LOG.warn("Uncaught exception in ContainerMemoryManager while get thread count of " + pid, e); + } finally { + if (br != null) { + try { + br.close(); + } catch (Exception e) { + // ignore + } + } + } + + return threads; + } + private String formatErrorMessage(String memTypeExceeded, long currentVmemUsage, long vmemLimit, long currentPmemUsage, long pmemLimit,