diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 02a63ac..70ef702 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,13 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,10 +32,15 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; -import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; -import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { @@ -352,6 +351,7 @@ public void run() { // Remove finished containers synchronized (containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) { + UsageMetrics.forContainer(containerId).stop(); trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); } @@ -408,7 +408,11 @@ public void run() { LOG.info(String.format( "Memory usage of ProcessTree %s for container-id %s: ", pId, containerId.toString()) + - formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); + formatUsageString( + currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); + + UsageMetrics.forContainer(containerId).recordMemoryUsage( + (int) (currentPmemUsage >> 20)); boolean isMemoryOverLimit = false; String msg = ""; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/UsageMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/UsageMetrics.java new file mode 100644 index 0000000..8734f5c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/UsageMetrics.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableStat; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@InterfaceAudience.Private +@Metrics(context="container-utilization") +public class UsageMetrics implements MetricsSource { + @Metric + public MutableStat pMemMBsStat; + + static final MetricsInfo RECORD_INFO = + info("ContainerUsage", "Resource usage by container"); + + final MetricsInfo recordInfo; + final MetricsRegistry registry; + final ContainerId containerId; + final MetricsSystem metricsSystem; + + private boolean flush = false; + private boolean unregister = false; + + /** + * Simple metrics cache to help prevent re-registrations. + */ + protected final static Map usageMetrics = + new HashMap(); + + UsageMetrics(MetricsSystem ms, ContainerId containerId) { + this.recordInfo = + info(sourceName(containerId), RECORD_INFO.description()); + this.registry = new MetricsRegistry(recordInfo); + this.metricsSystem = ms; + this.containerId = containerId; + this.pMemMBsStat = registry.newStat( + "pMem", "Physical memory stats", "Usage", "MBs", true); + } + + UsageMetrics tag(MetricsInfo info, ContainerId containerId) { + registry.tag(info, containerId.toString()); + return this; + } + + static String sourceName(ContainerId containerId) { + return RECORD_INFO.name() + "_" + containerId.toString(); + } + + public synchronized static UsageMetrics forContainer(ContainerId containerId) { + return forContainer(DefaultMetricsSystem.instance(), containerId); + } + + static UsageMetrics forContainer(MetricsSystem ms, ContainerId containerId) { + UsageMetrics metrics = usageMetrics.get(containerId); + if (metrics == null) { + metrics = + new UsageMetrics(ms, containerId).tag(RECORD_INFO, containerId); + + // Register with the MetricsSystems + if (ms != null) { + metrics = + ms.register(sourceName(containerId), + "Metrics for container: " + containerId, metrics); + } + usageMetrics.put(containerId, metrics); + } + + return metrics; + } + + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + if (unregister) { + metricsSystem.unregisterSource(recordInfo.name()); + usageMetrics.remove(containerId); + return; + } + + if (flush) { + this.unregister = true; + registry.snapshot(collector.addRecord(registry.info()), all); + this.flush = false; + } + } + + public synchronized void stop() { + this.flush = true; + } + + /** + * Helper method to clear cache. + */ + @InterfaceAudience.Private + public synchronized static void clearQueueMetrics() { + usageMetrics.clear(); + } + + public MetricsSystem getMetricsSystem() { + return metricsSystem; + } + + public void recordMemoryUsage(int memoryMBs) { + this.pMemMBsStat.add(memoryMBs); + } +}