diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9741f6c..5c8f21f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -107,6 +107,15 @@ 300000 + + + The interval of queue size (in thousands) + for printing the boom queue event type details. + + yarn.dispatcher.print-events-info.interval-in-thousands + 5 + + The expiry interval for application master reporting. yarn.am.liveness-monitor.expiry-interval-ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6bbcdcb..b2b1d71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2456,6 +2456,16 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** + * The interval of queue size (in thousands) + * for printing the boom queue event type details. + */ + public static final String + DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS = + YARN_PREFIX + "dispatcher.print-events-info.interval-in-thousands"; + public static final int + DEFAULT_DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS = 5; + /** * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * entries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 5019369..91ea308 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -57,6 +59,10 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private volatile int lastEventQueueSizeLogged = 0; private volatile boolean stopped = false; + //Configuration for control the details queue event printing. + private int detailsInterval; + private boolean printTrigger = false; + // Configuration flag for enabling/disabling draining dispatcher's events on // stop functionality. private volatile boolean drainEventsOnStop = false; @@ -129,6 +135,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } if (event != null) { dispatch(event); + if (printTrigger) { + //Log the latest dispatch event type + // may cause the too many events queued + LOG.info("Latest dispatch event type: " + event.getType()); + printTrigger = false; + } } } } @@ -140,6 +152,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { exitOnDispatchException = false; } + @Override + protected void serviceInit(Configuration conf) throws Exception{ + super.serviceInit(conf); + this.detailsInterval = getConfig().getInt(YarnConfiguration. + DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS, + YarnConfiguration. + DEFAULT_DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS + ) * 1000; + } + @Override protected void serviceStart() throws Exception { //start all the components @@ -246,6 +268,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } class GenericEventHandler implements EventHandler { + + private void printEventQueueDetails(BlockingQueue queue) { + Map counterMap = eventQueue.stream(). + collect(Collectors. + groupingBy(e -> e.getType(), Collectors.counting()) + ); + for (Map.Entry entry : counterMap.entrySet()) { + long num = entry.getValue(); + LOG.info("Event type: " + entry.getKey() + + ", Event record counter: " + num); + } + } + public void handle(Event event) { if (blockNewEvents) { return; @@ -258,6 +293,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); + + if (qSize % detailsInterval == 0) { + printEventQueueDetails(eventQueue); + printTrigger = true; + } } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 2b9d745..7e8430f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.event; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -93,6 +96,20 @@ public class TestAsyncDispatcher { DUMMY } + private static class TestHandler implements EventHandler { + @Override + public void handle(Event event) { + try { + // As long as 10000 events queued + Thread.sleep(1500); + } catch (InterruptedException e) {} + } + } + + private enum TestEnum { + TestEventType + } + @SuppressWarnings({ "rawtypes", "unchecked" }) private void dispatchDummyEvents(Dispatcher disp, int count) { for (int i = 0; i < count; i++) { @@ -119,5 +136,45 @@ public class TestAsyncDispatcher { disp.close(); assertEquals(0, queue.size()); } + + //Test print dispatcher details when the blocking queue is heavy + @Test(timeout = 10000) + public void testPrintDispatcherEventDetails() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration. + DISPATCHER_PRINT_EVENTS_INFO_INTERVAL_IN_THOUSANDS, 5); + Logger log = mock(Logger.class); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + + Field logger = AsyncDispatcher.class.getDeclaredField("LOG"); + logger.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL); + Object oldLog = logger.get(null); + + try { + logger.set(null, log); + dispatcher.register(TestEnum.class, new TestHandler()); + dispatcher.start(); + + for (int i = 0; i < 10000; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + verify(log, atLeastOnce()).info("Event type: TestEventType, " + + "Event record counter: 5000"); + Thread.sleep(2000); + //Make sure more than one event to take + verify(log, atLeastOnce()). + info("Latest dispatch event type: TestEventType"); + dispatcher.stop(); + } finally { + //... restore logger object + logger.set(null, oldLog); + } + } }