diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9741f6c..5c8f21f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -107,6 +107,15 @@
300000
+
+
+ The interval of queue size (in thousands)
+ for printing the boom queue event type details.
+
+ yarn.dispatcher.print-events-info.interval-in-thousands
+ 5
+
+
The expiry interval for application master reporting.
yarn.am.liveness-monitor.expiry-interval-ms
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6bbcdcb..b2b1d71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2456,6 +2456,16 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;
+ /**
+ * The interval of queue size (in thousands)
+ * for printing the boom queue event type details.
+ */
+ public static final String
+ DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS =
+ YARN_PREFIX + "dispatcher.print-events-info.interval-in-thousands";
+ public static final int
+ DEFAULT_DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS = 5;
+
/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 5019369..91ea308 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -24,7 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@@ -57,6 +59,10 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private volatile int lastEventQueueSizeLogged = 0;
private volatile boolean stopped = false;
+ //Configuration for control the details queue event printing.
+ private int detailsInterval;
+ private boolean printTrigger = false;
+
// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
private volatile boolean drainEventsOnStop = false;
@@ -129,6 +135,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
if (event != null) {
dispatch(event);
+ if (printTrigger) {
+ //Log the latest dispatch event type
+ // may cause the too many events queued
+ LOG.info("Latest dispatch event type: " + event.getType());
+ printTrigger = false;
+ }
}
}
}
@@ -140,6 +152,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
exitOnDispatchException = false;
}
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception{
+ super.serviceInit(conf);
+ this.detailsInterval = getConfig().getInt(YarnConfiguration.
+ DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS,
+ YarnConfiguration.
+ DEFAULT_DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS
+ ) * 1000;
+ }
+
@Override
protected void serviceStart() throws Exception {
//start all the components
@@ -246,6 +268,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
class GenericEventHandler implements EventHandler {
+
+ private void printEventQueueDetails(BlockingQueue queue) {
+ Map counterMap = eventQueue.stream().
+ collect(Collectors.
+ groupingBy(e -> e.getType(), Collectors.counting())
+ );
+ for (Map.Entry entry : counterMap.entrySet()) {
+ long num = entry.getValue();
+ LOG.info("Event type: " + entry.getKey()
+ + ", Event record counter: " + num);
+ }
+ }
+
public void handle(Event event) {
if (blockNewEvents) {
return;
@@ -258,6 +293,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
+
+ if (qSize % detailsInterval == 0) {
+ printEventQueueDetails(eventQueue);
+ printTrigger = true;
+ }
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
index 2b9d745..7e8430f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.event;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
@@ -93,6 +96,20 @@ public class TestAsyncDispatcher {
DUMMY
}
+ private static class TestHandler implements EventHandler {
+ @Override
+ public void handle(Event event) {
+ try {
+ // As long as 10000 events queued
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ private enum TestEnum {
+ TestEventType
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
private void dispatchDummyEvents(Dispatcher disp, int count) {
for (int i = 0; i < count; i++) {
@@ -119,5 +136,45 @@ public class TestAsyncDispatcher {
disp.close();
assertEquals(0, queue.size());
}
+
+ //Test print dispatcher details when the blocking queue is heavy
+ @Test(timeout = 10000)
+ public void testPrintDispatcherEventDetails() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.
+ DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS, 5);
+ Logger log = mock(Logger.class);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+
+ Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
+ logger.setAccessible(true);
+ Field modifiers = Field.class.getDeclaredField("modifiers");
+ modifiers.setAccessible(true);
+ modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+ Object oldLog = logger.get(null);
+
+ try {
+ logger.set(null, log);
+ dispatcher.register(TestEnum.class, new TestHandler());
+ dispatcher.start();
+
+ for (int i = 0; i < 10000; ++i) {
+ Event event = mock(Event.class);
+ when(event.getType()).thenReturn(TestEnum.TestEventType);
+ dispatcher.getEventHandler().handle(event);
+ }
+ verify(log, atLeastOnce()).info("Event type: TestEventType, " +
+ "Event record counter: 5000");
+ Thread.sleep(2000);
+ //Make sure more than one event to take
+ verify(log, atLeastOnce()).
+ info("Latest dispatch event type: TestEventType");
+ dispatcher.stop();
+ } finally {
+ //... restore logger object
+ logger.set(null, oldLog);
+ }
+ }
}