diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index 513d6d7..ef7306b 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -395,7 +395,8 @@ public synchronized void publishMetricsNow() {
* Sample all the sources for a snapshot of metrics/tags
* @return the metrics buffer containing the snapshot
*/
- synchronized MetricsBuffer sampleMetrics() {
+ @VisibleForTesting
+ public synchronized MetricsBuffer sampleMetrics() {
collector.clear();
MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 33e8a1f..d2106cd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1008,7 +1008,15 @@ private static void addDeprecatedKeys() {
NM_PREFIX + "container-metrics.period-ms";
@Private
public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
-
+
+ /** The delay time ms to unregister container metrics after completion. */
+ @Private
+ public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
+ NM_PREFIX + "container-metrics.unregister-delay-ms";
+ @Private
+ public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
+ 10000;
+
/** Prefix for all node manager disk health checker configs. */
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
"yarn.nodemanager.disk-health-checker.";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bcd64c3..c8bca8e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1573,6 +1573,14 @@
+ The delay time ms to unregister container metrics after completion.
+
+ yarn.nodemanager.container-metrics.unregister-delay-ms
+ 10000
+
+
+
+
Class used to calculate current container resource utilization.
yarn.nodemanager.container-monitor.process-tree.class
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
index c364143..32a5052 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -32,7 +32,9 @@
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -100,6 +102,8 @@
private boolean flushOnPeriod = false; // true if period elapsed
private boolean finished = false; // true if container finished
private boolean unregister = false; // unregister
+ private long unregisterDelayMs;
+ private long unregisterTimestamp;
private Timer timer; // lazily initialized
/**
@@ -107,15 +111,53 @@
*/
protected final static Map
usageMetrics = new HashMap<>();
+ private static final long UNREGISTER_TIMER_INTERVAL_MS = 3000L;
+ private static Timer unregisterContainerMetricsTimer;
+ public static void init() {
+ unregisterContainerMetricsTimer =
+ new Timer("Container metrics unregistration", true);
+
+ TimerTask unregisterTimerTask = new TimerTask() {
+ @Override
+ public void run() {
+ List expiredCMs = new ArrayList();
+ long currentTimeStamp = System.currentTimeMillis();
+ synchronized(ContainerMetrics.class) {
+ for (ContainerMetrics cm : usageMetrics.values()) {
+ if (cm.unregisterTimestamp != -1L &&
+ cm.unregisterTimestamp < currentTimeStamp) {
+ expiredCMs.add(cm);
+ }
+ }
+ }
+ for (ContainerMetrics cm : expiredCMs) {
+ ContainerMetrics.unregisterContainerMetrics(cm);
+ }
+ }
+ };
+
+ unregisterContainerMetricsTimer.scheduleAtFixedRate(unregisterTimerTask,
+ UNREGISTER_TIMER_INTERVAL_MS, UNREGISTER_TIMER_INTERVAL_MS);
+ }
+
+ public static void shutDown() {
+ if (unregisterContainerMetricsTimer != null) {
+ unregisterContainerMetricsTimer.cancel();
+ unregisterContainerMetricsTimer = null;
+ }
+ }
ContainerMetrics(
- MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+ MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
+ long delayMs) {
this.recordInfo =
info(sourceName(containerId), RECORD_INFO.description());
this.registry = new MetricsRegistry(recordInfo);
this.metricsSystem = ms;
this.containerId = containerId;
this.flushPeriodMs = flushPeriodMs;
+ this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs;
+ this.unregisterTimestamp = -1L;
scheduleTimerTaskIfRequired();
this.pMemMBsStat = registry.newStat(
@@ -148,17 +190,18 @@ static String sourceName(ContainerId containerId) {
}
public static ContainerMetrics forContainer(
- ContainerId containerId, long flushPeriodMs) {
+ ContainerId containerId, long flushPeriodMs, long delayMs) {
return forContainer(
- DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
+ DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
}
synchronized static ContainerMetrics forContainer(
- MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+ MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
+ long delayMs) {
ContainerMetrics metrics = usageMetrics.get(containerId);
if (metrics == null) {
- metrics = new ContainerMetrics(
- ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
+ metrics = new ContainerMetrics(ms, containerId, flushPeriodMs,
+ delayMs).tag(RECORD_INFO, containerId);
// Register with the MetricsSystems
if (ms != null) {
@@ -172,12 +215,15 @@ synchronized static ContainerMetrics forContainer(
return metrics;
}
+ synchronized static void unregisterContainerMetrics(ContainerMetrics cm) {
+ cm.metricsSystem.unregisterSource(cm.recordInfo.name());
+ usageMetrics.remove(cm.containerId);
+ }
+
@Override
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
//Container goes through registered -> finished -> unregistered.
if (unregister) {
- metricsSystem.unregisterSource(recordInfo.name());
- usageMetrics.remove(containerId);
return;
}
@@ -199,6 +245,7 @@ public synchronized void finished() {
timer.cancel();
timer = null;
}
+ unregisterTimestamp = System.currentTimeMillis() + unregisterDelayMs;
}
public void recordMemoryUsage(int memoryMBs) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index b3839d2..232b8de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -55,6 +55,7 @@
private MonitoringThread monitoringThread;
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;
+ private long containerMetricsUnregisterDelayMs;
@VisibleForTesting
final Map trackingContainers =
@@ -126,6 +127,9 @@ protected void serviceInit(Configuration conf) throws Exception {
this.containerMetricsPeriodMs =
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
+ this.containerMetricsUnregisterDelayMs = conf.getLong(
+ YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
long configuredPMemForContainers =
NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
@@ -209,6 +213,9 @@ private boolean isEnabled() {
@Override
protected void serviceStart() throws Exception {
+ if (containerMetricsEnabled) {
+ ContainerMetrics.init();
+ }
if (containersMonitorEnabled) {
this.monitoringThread.start();
}
@@ -217,6 +224,9 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
+ if (containerMetricsEnabled) {
+ ContainerMetrics.shutDown();
+ }
if (containersMonitorEnabled) {
stopped = true;
this.monitoringThread.interrupt();
@@ -425,7 +435,8 @@ public void run() {
if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics
- .forContainer(containerId, containerMetricsPeriodMs);
+ .forContainer(containerId, containerMetricsPeriodMs,
+ containerMetricsUnregisterDelayMs);
usageMetrics.recordProcessId(pId);
}
}
@@ -476,10 +487,12 @@ public void run() {
// Add usage to container metrics
if (containerMetricsEnabled) {
ContainerMetrics.forContainer(
- containerId, containerMetricsPeriodMs).recordMemoryUsage(
+ containerId, containerMetricsPeriodMs,
+ containerMetricsUnregisterDelayMs).recordMemoryUsage(
(int) (currentPmemUsage >> 20));
ContainerMetrics.forContainer(
- containerId, containerMetricsPeriodMs).recordCpuUsage
+ containerId, containerMetricsPeriodMs,
+ containerMetricsUnregisterDelayMs).recordCpuUsage
((int)cpuUsagePercentPerCore, milliVcoresUsed);
}
@@ -609,7 +622,8 @@ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
ContainerId containerId = monitoringEvent.getContainerId();
ContainerMetrics usageMetrics = ContainerMetrics
- .forContainer(containerId, containerMetricsPeriodMs);
+ .forContainer(containerId, containerMetricsPeriodMs,
+ containerMetricsUnregisterDelayMs);
int vmemLimitMBs;
int pmemLimitMBs;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
index bdf9994..2fcd59b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
@@ -22,11 +22,15 @@
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.impl.MetricsRecords;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -44,7 +48,8 @@ public void testContainerMetricsFlow() throws InterruptedException {
MetricsCollectorImpl collector = new MetricsCollectorImpl();
ContainerId containerId = mock(ContainerId.class);
- ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+ ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
+ 100, 1);
metrics.recordMemoryUsage(1024);
metrics.getMetrics(collector, true);
@@ -82,7 +87,8 @@ public void testContainerMetricsLimit() throws InterruptedException {
MetricsCollectorImpl collector = new MetricsCollectorImpl();
ContainerId containerId = mock(ContainerId.class);
- ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+ ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
+ 100, 1);
int anyPmemLimit = 1024;
int anyVmemLimit = 2048;
@@ -117,4 +123,41 @@ public void testContainerMetricsLimit() throws InterruptedException {
collector.clear();
}
+
+ @Test(timeout = 20000)
+ public void testContainerMetricsFinished() throws InterruptedException {
+ ContainerMetrics.init();
+ MetricsSystemImpl system = new MetricsSystemImpl();
+ system.init("test");
+ MetricsCollectorImpl collector = new MetricsCollectorImpl();
+ ApplicationId appId = ApplicationId.newInstance(1234, 3);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 4);
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+ ContainerMetrics metrics1 = ContainerMetrics.forContainer(system,
+ containerId1, 1, 0);
+ ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
+ ContainerMetrics metrics2 = ContainerMetrics.forContainer(system,
+ containerId2, 1, 0);
+ ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
+ ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
+ containerId3, 1, 0);
+ metrics1.finished();
+ metrics2.finished();
+ system.sampleMetrics();
+ system.sampleMetrics();
+ Thread.sleep(10000);
+ system.stop();
+ // verify metrics1 is unregistered
+ assertTrue(metrics1 != ContainerMetrics.forContainer(
+ system, containerId1, 1, 0));
+ // verify metrics2 is unregistered
+ assertTrue(metrics2 != ContainerMetrics.forContainer(
+ system, containerId2, 1, 0));
+ // verify metrics3 is still registered
+ assertTrue(metrics3 == ContainerMetrics.forContainer(
+ system, containerId3, 1, 0));
+ system.shutdown();
+ ContainerMetrics.shutDown();
+ }
}