diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index df542ed..0894c8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.SystemClock; /** * Dispatches {@link Event}s in a separate thread. Currently only single thread @@ -232,6 +234,39 @@ public EventHandler getEventHandler() { } class GenericEventHandler implements EventHandler { + + private void printEventQueueInfo(BlockingQueue queue) { + Iterator iterator = queue.iterator(); + Map counterMap = new HashMap<>(); + + // count 50w event, takes about 17ms, + long start = SystemClock.getInstance().getTime(); + long total = 0; + while (iterator.hasNext()) { + total += 1; + Event event = iterator.next(); + Enum type = event.getType(); + if (!counterMap.containsKey(type)) { + counterMap.put(type, new long[]{1}); + } else { + counterMap.get(type)[0]++; + } + } + + if (total == 0) { + return; + } + long end = SystemClock.getInstance().getTime(); + + LOG.info("count dispatcher event queue, time used: " + + (end - start) + "ms"); + for (Map.Entry entry : counterMap.entrySet()) { + long num = entry.getValue()[0]; + LOG.info("event type: " + entry.getKey() + ", num: " + num + + ", capacity: " + num/(float)total); + } + } + public void handle(Event event) { if (blockNewEvents) { return; @@ -244,6 +279,9 @@ public void handle(Event event) { && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); + if (qSize % 5000 == 0) { + printEventQueueInfo(eventQueue); + } } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) {