diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3273b47..2718f1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -822,6 +822,16 @@ private static void addDeprecatedKeys() { "container-monitor.procfs-tree.smaps-based-rss.enabled"; public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED = false; + + /** Enable/disable container metrics. */ + public static final String NM_CONTAINER_METRICS_ENABLE = + NM_PREFIX + "container-metrics.enable"; + public static final boolean DEFAULT_NM_CONTAINER_METRICS_ENABLE = true; + + /** Container metrics flush period. -1 for flush on completion. */ + public static final String NM_CONTAINER_METRICS_PERIOD_MS = + NM_PREFIX + "container-metrics.period-ms"; + public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1; /** Prefix for all node manager disk health checker configs. */ private static final String NM_DISK_HEALTH_CHECK_PREFIX = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7dea2c3..5b79fa7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -925,6 +925,20 @@ + Enable/disable metrics for containers + yarn.nodemanager.container-metrics.enable + true + + + + Container metrics' publishing period in milliseconds. When the + value is 0 or lower, metrics are published only when the container + finishes. + yarn.nodemanager.container-metrics.period-ms + -1 + + + Frequency of running node health script. yarn.nodemanager.health-checker.interval-ms 600000 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java new file mode 100644 index 0000000..21fa72c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableStat; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@InterfaceAudience.Private +@Metrics(context="container") +public class ContainerMetrics implements MetricsSource { + + @Metric + public MutableStat pMemMBsStat; + + static final MetricsInfo RECORD_INFO = + info("ContainerUsage", "Resource usage by container"); + + final MetricsInfo recordInfo; + final MetricsRegistry registry; + final ContainerId containerId; + final MetricsSystem metricsSystem; + + // Metrics publishing status + private long flushPeriodMs; + private boolean flushOnPeriod = false; // true if period elapsed + private boolean finished = false; // true if container finished + private boolean unregister = false; // unregister + private Timer timer; // lazily initialized + + /** + * Simple metrics cache to help prevent re-registrations. + */ + protected final static Map usageMetrics = + new HashMap(); + + ContainerMetrics( + MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + this.recordInfo = + info(sourceName(containerId), RECORD_INFO.description()); + this.registry = new MetricsRegistry(recordInfo); + this.metricsSystem = ms; + this.containerId = containerId; + this.flushPeriodMs = flushPeriodMs; + scheduleTimerTaskIfRequired(); + + this.pMemMBsStat = registry.newStat( + "pMem", "Physical memory stats", "Usage", "MBs", true); + } + + ContainerMetrics tag(MetricsInfo info, ContainerId containerId) { + registry.tag(info, containerId.toString()); + return this; + } + + static String sourceName(ContainerId containerId) { + return RECORD_INFO.name() + "_" + containerId.toString(); + } + + public static ContainerMetrics forContainer(ContainerId containerId) { + return forContainer(containerId, -1L); + } + + public static ContainerMetrics forContainer( + ContainerId containerId, long flushPeriodMs) { + return forContainer( + DefaultMetricsSystem.instance(), containerId, flushPeriodMs); + } + + synchronized static ContainerMetrics forContainer( + MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { + ContainerMetrics metrics = usageMetrics.get(containerId); + if (metrics == null) { + metrics = new ContainerMetrics( + ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId); + + // Register with the MetricsSystems + if (ms != null) { + metrics = + ms.register(sourceName(containerId), + "Metrics for container: " + containerId, metrics); + } + usageMetrics.put(containerId, metrics); + } + + return metrics; + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + //Container goes through registered -> finished -> unregistered. + if (unregister) { + metricsSystem.unregisterSource(recordInfo.name()); + usageMetrics.remove(containerId); + return; + } + + if (finished || flushOnPeriod) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + if (finished) { + this.unregister = true; + } else if (flushOnPeriod) { + flushOnPeriod = false; + scheduleTimerTaskIfRequired(); + } + } + + public synchronized void finished() { + this.finished = true; + if (timer != null) { + timer.cancel(); + } + } + + public void recordMemoryUsage(int memoryMBs) { + this.pMemMBsStat.add(memoryMBs); + } + + private synchronized void scheduleTimerTaskIfRequired() { + if (flushPeriodMs > 0) { + // Lazily initialize timer + if (timer == null) { + this.timer = new Timer("Metrics flush checker", true); + } + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + synchronized (ContainerMetrics.this) { + if (!finished) { + flushOnPeriod = true; + } + } + } + }; + timer.schedule(timerTask, flushPeriodMs); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 02a63ac..55f95bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,13 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,10 +32,15 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; -import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; -import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { @@ -51,6 +50,8 @@ private long monitoringInterval; private MonitoringThread monitoringThread; + private boolean containerMetricsEnabled; + private long containerMetricsPeriodMs; final List containersToBeRemoved; final Map containersToBeAdded; @@ -106,6 +107,13 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info(" Using ResourceCalculatorProcessTree : " + this.processTreeClass); + this.containerMetricsEnabled = + conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE); + this.containerMetricsPeriodMs = + conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); + long configuredPMemForContainers = conf.getLong( YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l; @@ -352,6 +360,8 @@ public void run() { // Remove finished containers synchronized (containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) { + // TODO: Remove this code +// ContainerMetrics.forContainer(containerId).stop(); trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); } @@ -408,7 +418,15 @@ public void run() { LOG.info(String.format( "Memory usage of ProcessTree %s for container-id %s: ", pId, containerId.toString()) + - formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); + formatUsageString( + currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); + + // Add usage to container metrics + if (containerMetricsEnabled) { + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).recordMemoryUsage( + (int) (currentPmemUsage >> 20)); + } boolean isMemoryOverLimit = false; String msg = ""; @@ -563,6 +581,9 @@ public void handle(ContainersMonitorEvent monitoringEvent) { case STOP_MONITORING_CONTAINER: synchronized (this.containersToBeRemoved) { this.containersToBeRemoved.add(containerId); + if (this.containerMetricsEnabled) { + ContainerMetrics.forContainer(containerId).finished(); + } } break; default: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java new file mode 100644 index 0000000..1320627 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class TestContainerMetrics { + + @Test + public void testContainerMetricsFlow() throws InterruptedException { + final String ERR = "Error in number of records"; + + // Create a dummy MetricsSystem + MetricsSystem system = mock(MetricsSystem.class); + doReturn(this).when(system).register(anyString(), anyString(), any()); + + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ContainerId containerId = mock(ContainerId.class); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + + metrics.recordMemoryUsage(1024); + metrics.getMetrics(collector, true); + assertEquals(ERR, 0, collector.getRecords().size()); + + Thread.sleep(120); + metrics.getMetrics(collector, true); + assertEquals(ERR, 1, collector.getRecords().size()); + + metrics.finished(); + metrics.getMetrics(collector, true); + assertEquals(ERR, 2, collector.getRecords().size()); + + metrics.getMetrics(collector, true); + assertEquals(ERR, 2, collector.getRecords().size()); + } +}