diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3273b47..2718f1e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -822,6 +822,16 @@ private static void addDeprecatedKeys() {
"container-monitor.procfs-tree.smaps-based-rss.enabled";
public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
false;
+
+ /** Enable/disable container metrics. */
+ public static final String NM_CONTAINER_METRICS_ENABLE =
+ NM_PREFIX + "container-metrics.enable";
+ public static final boolean DEFAULT_NM_CONTAINER_METRICS_ENABLE = true;
+
+ /** Container metrics flush period. -1 for flush on completion. */
+ public static final String NM_CONTAINER_METRICS_PERIOD_MS =
+ NM_PREFIX + "container-metrics.period-ms";
+ public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
/** Prefix for all node manager disk health checker configs. */
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7dea2c3..5b79fa7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -925,6 +925,20 @@
+ Enable/disable metrics for containers
+ yarn.nodemanager.container-metrics.enable
+ true
+
+
+
+ Container metrics' publishing period in milliseconds. When the
+ value is 0 or lower, metrics are published only when the container
+ finishes.
+ yarn.nodemanager.container-metrics.period-ms
+ -1
+
+
+
Frequency of running node health script.
yarn.nodemanager.health-checker.interval-ms
600000
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
new file mode 100644
index 0000000..21fa72c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+@InterfaceAudience.Private
+@Metrics(context="container")
+public class ContainerMetrics implements MetricsSource {
+
+ @Metric
+ public MutableStat pMemMBsStat;
+
+ static final MetricsInfo RECORD_INFO =
+ info("ContainerUsage", "Resource usage by container");
+
+ final MetricsInfo recordInfo;
+ final MetricsRegistry registry;
+ final ContainerId containerId;
+ final MetricsSystem metricsSystem;
+
+ // Metrics publishing status
+ private long flushPeriodMs;
+ private boolean flushOnPeriod = false; // true if period elapsed
+ private boolean finished = false; // true if container finished
+ private boolean unregister = false; // unregister
+ private Timer timer; // lazily initialized
+
+ /**
+ * Simple metrics cache to help prevent re-registrations.
+ */
+ protected final static Map usageMetrics =
+ new HashMap();
+
+ ContainerMetrics(
+ MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+ this.recordInfo =
+ info(sourceName(containerId), RECORD_INFO.description());
+ this.registry = new MetricsRegistry(recordInfo);
+ this.metricsSystem = ms;
+ this.containerId = containerId;
+ this.flushPeriodMs = flushPeriodMs;
+ scheduleTimerTaskIfRequired();
+
+ this.pMemMBsStat = registry.newStat(
+ "pMem", "Physical memory stats", "Usage", "MBs", true);
+ }
+
+ ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
+ registry.tag(info, containerId.toString());
+ return this;
+ }
+
+ static String sourceName(ContainerId containerId) {
+ return RECORD_INFO.name() + "_" + containerId.toString();
+ }
+
+ public static ContainerMetrics forContainer(ContainerId containerId) {
+ return forContainer(containerId, -1L);
+ }
+
+ public static ContainerMetrics forContainer(
+ ContainerId containerId, long flushPeriodMs) {
+ return forContainer(
+ DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
+ }
+
+ synchronized static ContainerMetrics forContainer(
+ MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+ ContainerMetrics metrics = usageMetrics.get(containerId);
+ if (metrics == null) {
+ metrics = new ContainerMetrics(
+ ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
+
+ // Register with the MetricsSystems
+ if (ms != null) {
+ metrics =
+ ms.register(sourceName(containerId),
+ "Metrics for container: " + containerId, metrics);
+ }
+ usageMetrics.put(containerId, metrics);
+ }
+
+ return metrics;
+ }
+
+ @Override
+ public synchronized void getMetrics(MetricsCollector collector, boolean all) {
+ //Container goes through registered -> finished -> unregistered.
+ if (unregister) {
+ metricsSystem.unregisterSource(recordInfo.name());
+ usageMetrics.remove(containerId);
+ return;
+ }
+
+ if (finished || flushOnPeriod) {
+ registry.snapshot(collector.addRecord(registry.info()), all);
+ }
+
+ if (finished) {
+ this.unregister = true;
+ } else if (flushOnPeriod) {
+ flushOnPeriod = false;
+ scheduleTimerTaskIfRequired();
+ }
+ }
+
+ public synchronized void finished() {
+ this.finished = true;
+ if (timer != null) {
+ timer.cancel();
+ }
+ }
+
+ public void recordMemoryUsage(int memoryMBs) {
+ this.pMemMBsStat.add(memoryMBs);
+ }
+
+ private synchronized void scheduleTimerTaskIfRequired() {
+ if (flushPeriodMs > 0) {
+ // Lazily initialize timer
+ if (timer == null) {
+ this.timer = new Timer("Metrics flush checker", true);
+ }
+ TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (ContainerMetrics.this) {
+ if (!finished) {
+ flushOnPeriod = true;
+ }
+ }
+ }
+ };
+ timer.schedule(timerTask, flushPeriodMs);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 02a63ac..55f95bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -18,13 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -38,10 +32,15 @@
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
public class ContainersMonitorImpl extends AbstractService implements
ContainersMonitor {
@@ -51,6 +50,8 @@
private long monitoringInterval;
private MonitoringThread monitoringThread;
+ private boolean containerMetricsEnabled;
+ private long containerMetricsPeriodMs;
final List containersToBeRemoved;
final Map containersToBeAdded;
@@ -106,6 +107,13 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.info(" Using ResourceCalculatorProcessTree : "
+ this.processTreeClass);
+ this.containerMetricsEnabled =
+ conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
+ this.containerMetricsPeriodMs =
+ conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
+
long configuredPMemForContainers = conf.getLong(
YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
@@ -352,6 +360,8 @@ public void run() {
// Remove finished containers
synchronized (containersToBeRemoved) {
for (ContainerId containerId : containersToBeRemoved) {
+ // TODO: Remove this code
+// ContainerMetrics.forContainer(containerId).stop();
trackingContainers.remove(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
}
@@ -408,7 +418,15 @@ public void run() {
LOG.info(String.format(
"Memory usage of ProcessTree %s for container-id %s: ",
pId, containerId.toString()) +
- formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
+ formatUsageString(
+ currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
+
+ // Add usage to container metrics
+ if (containerMetricsEnabled) {
+ ContainerMetrics.forContainer(
+ containerId, containerMetricsPeriodMs).recordMemoryUsage(
+ (int) (currentPmemUsage >> 20));
+ }
boolean isMemoryOverLimit = false;
String msg = "";
@@ -563,6 +581,9 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
case STOP_MONITORING_CONTAINER:
synchronized (this.containersToBeRemoved) {
this.containersToBeRemoved.add(containerId);
+ if (this.containerMetricsEnabled) {
+ ContainerMetrics.forContainer(containerId).finished();
+ }
}
break;
default:
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
new file mode 100644
index 0000000..1320627
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestContainerMetrics {
+
+ @Test
+ public void testContainerMetricsFlow() throws InterruptedException {
+ final String ERR = "Error in number of records";
+
+ // Create a dummy MetricsSystem
+ MetricsSystem system = mock(MetricsSystem.class);
+ doReturn(this).when(system).register(anyString(), anyString(), any());
+
+ MetricsCollectorImpl collector = new MetricsCollectorImpl();
+ ContainerId containerId = mock(ContainerId.class);
+ ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+
+ metrics.recordMemoryUsage(1024);
+ metrics.getMetrics(collector, true);
+ assertEquals(ERR, 0, collector.getRecords().size());
+
+ Thread.sleep(120);
+ metrics.getMetrics(collector, true);
+ assertEquals(ERR, 1, collector.getRecords().size());
+
+ metrics.finished();
+ metrics.getMetrics(collector, true);
+ assertEquals(ERR, 2, collector.getRecords().size());
+
+ metrics.getMetrics(collector, true);
+ assertEquals(ERR, 2, collector.getRecords().size());
+ }
+}