diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5f0ad9a..1671865 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2325,6 +2325,16 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** The setting that controls whether to record the latest dispatch event is enabled or not. */ + public static final String DISPATCHER_LATEST_EVENT_ENABLED = + YARN_PREFIX + "dispatcher.latest.event.enabled"; + public static final boolean DEFAULT_DISPATCHER_LATEST_EVENT_ENABLED = false; + + /** The interval of queue size for printing the boom queue event type details. */ + public static final String DISPATCHER_EVENT_DETAILS_INTERVAL = + YARN_PREFIX + "dispatcher.event.details.interval"; + public static final int DEFAULT_DISPATCHER_EVENT_DETAILS_INTERVAL = 5000; + /** * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * entries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a22..7f7943f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -123,6 +124,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { return; } if (event != null) { + boolean getLatest = getConfig().getBoolean(YarnConfiguration.DISPATCHER_LATEST_EVENT_ENABLED, + YarnConfiguration.DEFAULT_DISPATCHER_LATEST_EVENT_ENABLED); + if (getLatest) { + //Log the latest dispatch event type may cause the too many events queued + LOG.info("Latest dispatch event type: " + event.getType()); + } dispatch(event); } } @@ -243,6 +250,23 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } class GenericEventHandler implements EventHandler { + private void printEventQueueDetails(BlockingQueue queue) { + Iterator iterator = queue.iterator(); + Map counterMap = new HashMap<>(); + while (iterator.hasNext()) { + Event event = iterator.next(); + Enum type = event.getType(); + if (!counterMap.containsKey(type)) { + counterMap.put(type, new long[]{1}); + } else { + counterMap.get(type)[0]++; + } + } + for (Map.Entry entry : counterMap.entrySet()) { + long num = entry.getValue()[0]; + LOG.info("Event type: " + entry.getKey() + ", Event record counter: " + num); + } + } public void handle(Event event) { if (blockNewEvents) { return; @@ -251,10 +275,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); + int detailsInterval = getConfig().getInt(YarnConfiguration.DISPATCHER_EVENT_DETAILS_INTERVAL, + YarnConfiguration.DEFAULT_DISPATCHER_EVENT_DETAILS_INTERVAL); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); + + if (qSize % detailsInterval == 0) { + printEventQueueDetails(eventQueue); + } } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) {