diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java index 513d6d7..ef7306b 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java @@ -395,7 +395,8 @@ public synchronized void publishMetricsNow() { * Sample all the sources for a snapshot of metrics/tags * @return the metrics buffer containing the snapshot */ - synchronized MetricsBuffer sampleMetrics() { + @VisibleForTesting + public synchronized MetricsBuffer sampleMetrics() { collector.clear(); MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33e8a1f..d2106cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1008,7 +1008,15 @@ private static void addDeprecatedKeys() { NM_PREFIX + "container-metrics.period-ms"; @Private public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1; - + + /** The delay time ms to unregister container metrics after completion. */ + @Private + public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS = + NM_PREFIX + "container-metrics.unregister-delay-ms"; + @Private + public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS = + 10000; + /** Prefix for all node manager disk health checker configs. */ private static final String NM_DISK_HEALTH_CHECK_PREFIX = "yarn.nodemanager.disk-health-checker."; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bcd64c3..c8bca8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1573,6 +1573,14 @@ + The delay time ms to unregister container metrics after completion. + + yarn.nodemanager.container-metrics.unregister-delay-ms + 10000 + + + + Class used to calculate current container resource utilization. yarn.nodemanager.container-monitor.process-tree.class diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index c364143..32a5052 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -32,7 +32,9 @@ import org.apache.hadoop.metrics2.lib.MutableStat; import org.apache.hadoop.yarn.api.records.ContainerId; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -100,6 +102,8 @@ private boolean flushOnPeriod = false; // true if period elapsed private boolean finished = false; // true if container finished private boolean unregister = false; // unregister + private long unregisterDelayMs; + private long unregisterTimestamp; private Timer timer; // lazily initialized /** @@ -107,15 +111,53 @@ */ protected final static Map usageMetrics = new HashMap<>(); + private static final long UNREGISTER_TIMER_INTERVAL_MS = 3000L; + private static Timer unregisterContainerMetricsTimer; + public static void init() { + unregisterContainerMetricsTimer = + new Timer("Container metrics unregistration", true); + + TimerTask unregisterTimerTask = new TimerTask() { + @Override + public void run() { + List expiredCMs = new ArrayList(); + long currentTimeStamp = System.currentTimeMillis(); + synchronized(ContainerMetrics.class) { + for (ContainerMetrics cm : usageMetrics.values()) { + if (cm.unregisterTimestamp != -1L && + cm.unregisterTimestamp < currentTimeStamp) { + expiredCMs.add(cm); + } + } + } + for (ContainerMetrics cm : expiredCMs) { + ContainerMetrics.unregisterContainerMetrics(cm); + } + } + }; + + unregisterContainerMetricsTimer.scheduleAtFixedRate(unregisterTimerTask, + UNREGISTER_TIMER_INTERVAL_MS, UNREGISTER_TIMER_INTERVAL_MS); + } + + public static void shutDown() { + if (unregisterContainerMetricsTimer != null) { + unregisterContainerMetricsTimer.cancel(); + unregisterContainerMetricsTimer = null; + } + } ContainerMetrics( - MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + MetricsSystem ms, ContainerId containerId, long flushPeriodMs, + long delayMs) { this.recordInfo = info(sourceName(containerId), RECORD_INFO.description()); this.registry = new MetricsRegistry(recordInfo); this.metricsSystem = ms; this.containerId = containerId; this.flushPeriodMs = flushPeriodMs; + this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs; + this.unregisterTimestamp = -1L; scheduleTimerTaskIfRequired(); this.pMemMBsStat = registry.newStat( @@ -148,17 +190,18 @@ static String sourceName(ContainerId containerId) { } public static ContainerMetrics forContainer( - ContainerId containerId, long flushPeriodMs) { + ContainerId containerId, long flushPeriodMs, long delayMs) { return forContainer( - DefaultMetricsSystem.instance(), containerId, flushPeriodMs); + DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs); } synchronized static ContainerMetrics forContainer( - MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + MetricsSystem ms, ContainerId containerId, long flushPeriodMs, + long delayMs) { ContainerMetrics metrics = usageMetrics.get(containerId); if (metrics == null) { - metrics = new ContainerMetrics( - ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId); + metrics = new ContainerMetrics(ms, containerId, flushPeriodMs, + delayMs).tag(RECORD_INFO, containerId); // Register with the MetricsSystems if (ms != null) { @@ -172,12 +215,15 @@ synchronized static ContainerMetrics forContainer( return metrics; } + synchronized static void unregisterContainerMetrics(ContainerMetrics cm) { + cm.metricsSystem.unregisterSource(cm.recordInfo.name()); + usageMetrics.remove(cm.containerId); + } + @Override public synchronized void getMetrics(MetricsCollector collector, boolean all) { //Container goes through registered -> finished -> unregistered. if (unregister) { - metricsSystem.unregisterSource(recordInfo.name()); - usageMetrics.remove(containerId); return; } @@ -199,6 +245,7 @@ public synchronized void finished() { timer.cancel(); timer = null; } + unregisterTimestamp = System.currentTimeMillis() + unregisterDelayMs; } public void recordMemoryUsage(int memoryMBs) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b3839d2..232b8de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -55,6 +55,7 @@ private MonitoringThread monitoringThread; private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; + private long containerMetricsUnregisterDelayMs; @VisibleForTesting final Map trackingContainers = @@ -126,6 +127,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.containerMetricsPeriodMs = conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); + this.containerMetricsUnregisterDelayMs = conf.getLong( + YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS); long configuredPMemForContainers = NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L; @@ -209,6 +213,9 @@ private boolean isEnabled() { @Override protected void serviceStart() throws Exception { + if (containerMetricsEnabled) { + ContainerMetrics.init(); + } if (containersMonitorEnabled) { this.monitoringThread.start(); } @@ -217,6 +224,9 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + if (containerMetricsEnabled) { + ContainerMetrics.shutDown(); + } if (containersMonitorEnabled) { stopped = true; this.monitoringThread.interrupt(); @@ -425,7 +435,8 @@ public void run() { if (containerMetricsEnabled) { ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); + .forContainer(containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs); usageMetrics.recordProcessId(pId); } } @@ -476,10 +487,12 @@ public void run() { // Add usage to container metrics if (containerMetricsEnabled) { ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordMemoryUsage( + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordMemoryUsage( (int) (currentPmemUsage >> 20)); ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordCpuUsage + containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs).recordCpuUsage ((int)cpuUsagePercentPerCore, milliVcoresUsed); } @@ -609,7 +622,8 @@ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) { ContainerId containerId = monitoringEvent.getContainerId(); ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); + .forContainer(containerId, containerMetricsPeriodMs, + containerMetricsUnregisterDelayMs); int vmemLimitMBs; int pmemLimitMBs; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java index bdf9994..2fcd59b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -22,11 +22,15 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -44,7 +48,8 @@ public void testContainerMetricsFlow() throws InterruptedException { MetricsCollectorImpl collector = new MetricsCollectorImpl(); ContainerId containerId = mock(ContainerId.class); - ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, + 100, 1); metrics.recordMemoryUsage(1024); metrics.getMetrics(collector, true); @@ -82,7 +87,8 @@ public void testContainerMetricsLimit() throws InterruptedException { MetricsCollectorImpl collector = new MetricsCollectorImpl(); ContainerId containerId = mock(ContainerId.class); - ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, + 100, 1); int anyPmemLimit = 1024; int anyVmemLimit = 2048; @@ -117,4 +123,41 @@ public void testContainerMetricsLimit() throws InterruptedException { collector.clear(); } + + @Test(timeout = 20000) + public void testContainerMetricsFinished() throws InterruptedException { + ContainerMetrics.init(); + MetricsSystemImpl system = new MetricsSystemImpl(); + system.init("test"); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerMetrics metrics1 = ContainerMetrics.forContainer(system, + containerId1, 1, 0); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerMetrics metrics2 = ContainerMetrics.forContainer(system, + containerId2, 1, 0); + ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); + ContainerMetrics metrics3 = ContainerMetrics.forContainer(system, + containerId3, 1, 0); + metrics1.finished(); + metrics2.finished(); + system.sampleMetrics(); + system.sampleMetrics(); + Thread.sleep(10000); + system.stop(); + // verify metrics1 is unregistered + assertTrue(metrics1 != ContainerMetrics.forContainer( + system, containerId1, 1, 0)); + // verify metrics2 is unregistered + assertTrue(metrics2 != ContainerMetrics.forContainer( + system, containerId2, 1, 0)); + // verify metrics3 is still registered + assertTrue(metrics3 == ContainerMetrics.forContainer( + system, containerId3, 1, 0)); + system.shutdown(); + ContainerMetrics.shutDown(); + } }