diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a22..2650427 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -51,6 +52,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private final BlockingQueue eventQueue; private volatile int lastEventQueueSizeLogged = 0; private volatile boolean stopped = false; + //A map that records the counter of every event type + private final Map eventTypeRecord = new ConcurrentHashMap<>(); + //Record the latest event type are being dispatched which may cause the + //too many events + private Enum latestEventType; // Configuration flag for enabling/disabling draining dispatcher's events on // stop functionality. @@ -123,6 +129,8 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { return; } if (event != null) { + latestEventType = event.getType(); + eventTypeRecord.put(event.getType(), eventTypeRecord.get(event.getType()) - 1); dispatch(event); } } @@ -255,6 +263,15 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); + if(qSize % 5000 == 0) { + //Log the event details that may cause too many events queued + for (Map.Entry entry : eventTypeRecord.entrySet()) { + long num = entry.getValue(); + LOG.info("Event type: " + entry.getKey() + ", Event record counter: " + num); + } + //Log the latest dispatch event type may cause the too many events queued + LOG.info("Latest dispatch event type: " + latestEventType); + } } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { @@ -263,6 +280,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } try { eventQueue.put(event); + if (eventTypeRecord.putIfAbsent(event.getType(), 1L) != null) { + eventTypeRecord.put(event.getType(), eventTypeRecord.get(event.getType()) + 1); + } } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e);