diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 5019369104d..1f8202e47e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -83,6 +83,8 @@ */ private String dispatcherThreadName = "AsyncDispatcher event handler"; + private DispatcherMetrics metrics; + public AsyncDispatcher() { this(new LinkedBlockingQueue()); } @@ -128,7 +130,13 @@ public void run() { return; } if (event != null) { - dispatch(event); + if (metrics != null) { + long startTime = System.nanoTime(); + dispatch(event); + metrics.incrementEventType(event, (System.nanoTime() - startTime) / 1000); + } else { + dispatch(event); + } } } } @@ -325,4 +333,8 @@ protected boolean isDrained() { protected boolean isStopped() { return stopped; } + + public void setMetrics(DispatcherMetrics metrics) { + this.metrics = metrics; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/DispatcherMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/DispatcherMetrics.java new file mode 100644 index 00000000000..ecd43ef7491 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/DispatcherMetrics.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public abstract class DispatcherMetrics implements MetricsSource { + + final MetricsRegistry registry; + final MetricsSystem metricsSystem; + + protected DispatcherMetrics(MetricsSystem ms, MetricsInfo metricsInfo) { + registry = new MetricsRegistry(metricsInfo); + metricsSystem = ms; + } + + protected static StringBuilder sourceName(MetricsInfo metricsInfo) { + return new StringBuilder(metricsInfo.name()); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public abstract void incrementEventType(Event event, long processingTimeUs); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java index ccd8e2e22b8..a2007af8750 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java @@ -47,6 +47,7 @@ private final Thread eventProcessor; private volatile boolean stopped = false; private boolean shouldExitOnError = true; + private DispatcherMetrics metrics; private static final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class); @@ -68,7 +69,13 @@ public void run() { } try { - handler.handle(event); + if (metrics != null) { + long startTime = System.nanoTime(); + handler.handle(event); + metrics.incrementEventType(event, (System.nanoTime() - startTime) / 1000); + } else { + handler.handle(event); + } } catch (Throwable t) { // An error occurred, but we are shutting down anyway. // If it was an InterruptedException, the very act of @@ -136,4 +143,8 @@ public void handle(T event) { public void disableExitOnError() { shouldExitOnError = false; } + + public void setMetrics(DispatcherMetrics metrics) { + this.metrics = metrics; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAsyncDispatcherMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAsyncDispatcherMetrics.java new file mode 100644 index 00000000000..7323c2cc73c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAsyncDispatcherMetrics.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.yarn.event.DispatcherMetrics; +import org.apache.hadoop.yarn.event.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.metrics2.lib.Interns.*; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class RMAsyncDispatcherMetrics extends DispatcherMetrics { + + static final Logger LOG = LoggerFactory.getLogger(RMAsyncDispatcherMetrics.class); + static final MetricsInfo RECORD_INFO = info("RMAsyncDispatcherMetrics", + "Metrics for RM async dispatcher"); + + static boolean initialized; + + @Metric("Node usable event count") MutableCounterLong nodeUsableCount; + @Metric("Node usable processing time") MutableCounterLong nodeUsableTimeUs; + @Metric("Node unusable event count") MutableCounterLong nodeUnusableCount; + @Metric("Node unusable processing time") MutableCounterLong nodeUnusableTimeUs; + + protected RMAsyncDispatcherMetrics(MetricsSystem ms) { + super(ms, RECORD_INFO); + } + + protected static StringBuilder sourceName() { + return new StringBuilder(RECORD_INFO.name()); + } + + public synchronized + static RMAsyncDispatcherMetrics registerMetrics() { + RMAsyncDispatcherMetrics metrics = null; + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (!initialized) { + // Register with the MetricsSystems + if (ms != null) { + metrics = new RMAsyncDispatcherMetrics(ms); + LOG.info("Registering RMAsyncDispatcherMetrics"); + ms.register( + sourceName().toString(), + "Metrics for RM async dispatcher", metrics); + initialized = true; + } + } + + return metrics; + } + + public void incrementEventType(Event event, long processingTimeUs) { + if (!(event.getType() instanceof NodesListManagerEventType)) { + return; + } + switch ((NodesListManagerEventType) (event.getType())) { + case NODE_USABLE: + nodeUsableCount.incr(); + nodeUsableTimeUs.incr(processingTimeUs); + break; + case NODE_UNUSABLE: + nodeUnusableCount.incr(); + nodeUnusableTimeUs.incr(processingTimeUs); + break; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c795bfaa797..4efe925f65a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DispatcherMetrics; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -442,11 +443,17 @@ protected void setRMStateStore(RMStateStore rmStore) { } protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher"); + EventDispatcher dispatcher = new EventDispatcher(this.scheduler, "SchedulerEventDispatcher"); + DispatcherMetrics metrics = SchedulerEventDispatcherMetrics.registerMetrics(); + dispatcher.setMetrics(metrics); + return dispatcher; } protected Dispatcher createDispatcher() { - return new AsyncDispatcher("RM Event dispatcher"); + AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); + DispatcherMetrics metrics = RMAsyncDispatcherMetrics.registerMetrics(); + dispatcher.setMetrics(metrics); + return dispatcher; } protected ResourceScheduler createScheduler() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SchedulerEventDispatcherMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SchedulerEventDispatcherMetrics.java new file mode 100644 index 00000000000..4b15acdb672 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SchedulerEventDispatcherMetrics.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.yarn.event.DispatcherMetrics; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.metrics2.lib.Interns.*; + + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class SchedulerEventDispatcherMetrics extends DispatcherMetrics { + + static final Logger LOG = LoggerFactory.getLogger(SchedulerEventDispatcherMetrics.class); + static final MetricsInfo RECORD_INFO = info("SchedulerEventDispatcherMetrics", + "Metrics for scheduler async dispatcher"); + + static boolean initialized; + + @Metric("Node added event count") MutableCounterLong nodeAddedCount; + @Metric("Node added processing time") MutableCounterLong nodeAddedTimeUs; + @Metric("Node removed event count") MutableCounterLong nodeRemovedCount; + @Metric("Node removed processing time") MutableCounterLong nodeRemovedTimeUs; + @Metric("Node update event count") MutableCounterLong nodeUpdateCount; + @Metric("Node update processing time") MutableCounterLong nodeUpdateTimeUs; + @Metric("Node resource update event count") MutableCounterLong nodeResourceUpdateCount; + @Metric("Node resource update processing time") MutableCounterLong nodeResourceUpdateTimeUs; + @Metric("Node labels update event count") MutableCounterLong nodeLabelsUpdateCount; + @Metric("Node labels update processing time") MutableCounterLong nodeLabelsUpdateTimeUs; + @Metric("App added event count") MutableCounterLong appAddedCount; + @Metric("App added processing time") MutableCounterLong appAddedTimeUs; + @Metric("App removed event count") MutableCounterLong appRemovedCount; + @Metric("App removed processing time") MutableCounterLong appRemovedTimeUs; + @Metric("App attempt added event count") MutableCounterLong appAttemptAddedCount; + @Metric("App attempt added processing time") MutableCounterLong appAttemptAddedTimeUs; + @Metric("App attempt removed event count") MutableCounterLong appAttemptRemovedCount; + @Metric("App attempt removed processing time") MutableCounterLong appAttemptRemovedTimeUs; + @Metric("Container expired event count") MutableCounterLong containerExpiredCount; + @Metric("Container expired processing time") MutableCounterLong containerExpiredTimeUs; + + protected SchedulerEventDispatcherMetrics(MetricsSystem ms) { + super(ms, RECORD_INFO); + } + + protected static StringBuilder sourceName() { + return new StringBuilder(RECORD_INFO.name()); + } + + public synchronized + static SchedulerEventDispatcherMetrics registerMetrics() { + SchedulerEventDispatcherMetrics metrics = null; + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (!initialized) { + // Register with the MetricsSystems + if (ms != null) { + metrics = new SchedulerEventDispatcherMetrics(ms); + LOG.info("Registering SchedulerEventDispatcherMetrics"); + ms.register( + sourceName().toString(), + "Metrics for scheduler async dispatcher", metrics); + initialized = true; + } + } + + return metrics; + } + + public void incrementEventType(Event event, long processingTimeUs) { + LOG.debug("Got scheduler event of type " + event.getType()); + switch ((SchedulerEventType) (event.getType())) { + case NODE_ADDED: + nodeAddedCount.incr(); + nodeAddedTimeUs.incr(processingTimeUs); + break; + case NODE_REMOVED: + nodeRemovedCount.incr(); + nodeRemovedTimeUs.incr(processingTimeUs); + break; + case NODE_UPDATE: + nodeUpdateCount.incr(); + nodeUpdateTimeUs.incr(processingTimeUs); + break; + case NODE_RESOURCE_UPDATE: + nodeResourceUpdateCount.incr(); + nodeResourceUpdateTimeUs.incr(processingTimeUs); + break; + case NODE_LABELS_UPDATE: + nodeLabelsUpdateCount.incr(); + nodeLabelsUpdateTimeUs.incr(processingTimeUs); + break; + case APP_ADDED: + appAddedCount.incr(); + appAddedTimeUs.incr(processingTimeUs); + break; + case APP_REMOVED: + appRemovedCount.incr(); + appRemovedTimeUs.incr(processingTimeUs); + break; + case APP_ATTEMPT_ADDED: + appAttemptAddedCount.incr(); + appAttemptAddedTimeUs.incr(processingTimeUs); + break; + case APP_ATTEMPT_REMOVED: + appAttemptRemovedCount.incr(); + appAttemptRemovedTimeUs.incr(processingTimeUs); + break; + case CONTAINER_EXPIRED: + containerExpiredCount.incr(); + containerExpiredTimeUs.incr(processingTimeUs); + break; + default: + break; + } + } +}