diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5f0ad9a..57898aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2325,6 +2325,11 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** The interval thousands of queue size for printing the boom queue event type details. */ + public static final String DISPATCHER_PRINT_EVENTS__INFO_INTERVAL_IN_THOUSANDS = + YARN_PREFIX + "dispatcher.print-events-info.interval-in-thousands"; + public static final int DEFAULT_DISPATCHER_PRINT_EVENTS__INFO_INTERVAL_IN_THOUSANDS = 5; + /** * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * entries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a22..1a2dbd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,6 +53,10 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private volatile int lastEventQueueSizeLogged = 0; private volatile boolean stopped = false; + //Configuration for control the details queue event printing. + int detailsInterval = 5; + private boolean printTrigger = false; + // Configuration flag for enabling/disabling draining dispatcher's events on // stop functionality. private volatile boolean drainEventsOnStop = false; @@ -123,6 +128,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { return; } if (event != null) { + if (printTrigger) { + //Log the latest dispatch event type may cause the too many events queued + LOG.info("Latest dispatch event type: " + event.getType()); + printTrigger = false; + } dispatch(event); } } @@ -142,6 +152,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName(dispatcherThreadName); eventHandlingThread.start(); + detailsInterval = getConfig().getInt(YarnConfiguration.DISPATCHER_PRINT_EVENTS__INFO_INTERVAL_IN_THOUSANDS, + YarnConfiguration.DEFAULT_DISPATCHER_PRINT_EVENTS__INFO_INTERVAL_IN_THOUSANDS); + } public void setDrainEventsOnStop() { @@ -243,6 +256,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } class GenericEventHandler implements EventHandler { + private void printEventQueueDetails(BlockingQueue queue) { + Map counterMap = + queue.parallelStream(). + collect(Collectors.toConcurrentMap( + w -> w.getType(), w -> 1L, Long::sum)); + for (Map.Entry entry : counterMap.entrySet()) { + long num = entry.getValue(); + LOG.info("Event type: " + entry.getKey() + ", Event record counter: " + num); + } + } public void handle(Event event) { if (blockNewEvents) { return; @@ -255,6 +278,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); + if (qSize % (detailsInterval * 1000) == 0) { + printEventQueueDetails(eventQueue); + printTrigger = true; + } } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) {