diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2103ae77c2f..6dc8a88b1ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2913,6 +2913,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** Enable/Disable multi thread dispatcher for each eventType **/ + public static final String MULTI_THREAD_DISPATCHER_ENABLED = + RM_PREFIX + "multi.thread.dispatcher.enabled"; + + public static final boolean + DEFAULT_MULTI_THREAD_DISPATCHER_ENABLED = false; + /** * The threshold used to trigger the logging of event types and counts * in RM's main event dispatcher. Default value is 5000, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 667515d00c1..b12e78ea638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -381,6 +381,11 @@ protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } + @VisibleForTesting + public BlockingQueue getEventQueue() { + return eventQueue; + } + protected boolean isDrained() { return drained; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6d2a9fed08b..8152ea909ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -149,6 +150,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -459,6 +462,86 @@ protected void setRMStateStore(RMStateStore rmStore) { return dispatcher; } + /** + * Dispatches events using multiple threads, + * each eventType with single thread. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static class MultiThreadDispatcher extends CompositeService + implements Dispatcher { + + private ConcurrentMap, + AsyncDispatcher> eventTypeDispatcherMap = + new ConcurrentHashMap, AsyncDispatcher>(); + + public MultiThreadDispatcher() { + super(ResourceManager.MultiThreadDispatcher.class.getName()); + } + + @Override + public EventHandler getEventHandler() { + return new CompositeEventHandler(); + } + + @Override + public void register(Class eventType, + EventHandler handler) { + if (eventTypeDispatcherMap.get(eventType) == null) { + AsyncDispatcher asyncDispatcher = + createDispatcher(eventType); + eventTypeDispatcherMap.put(eventType, + asyncDispatcher); + addIfService(asyncDispatcher); + } + eventTypeDispatcherMap. + get(eventType).register(eventType, handler); + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : + eventTypeDispatcherMap.values()) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositeEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Trigger the eventType corresponding handler. + eventTypeDispatcherMap. + get(event.getType().getDeclaringClass()). + getEventHandler(). + handle(event); + } + } + + protected AsyncDispatcher createDispatcher(Class eventType) { + return new AsyncDispatcher("RM single thread dispatcher for eventType: " + + eventType.getName()); + } + + @VisibleForTesting + public void disableExitOnDispatchException() { + for (AsyncDispatcher dispatcher : + eventTypeDispatcherMap.values()) { + dispatcher.disableExitOnDispatchException(); + } + } + + @VisibleForTesting + public AsyncDispatcher + getEventTypeAsyncDispatcher(Class eventType) { + return eventTypeDispatcherMap.get(eventType); + } + } + + protected Dispatcher createMultiThreadDispatcher() { + MultiThreadDispatcher dispatcher = + new MultiThreadDispatcher(); + return dispatcher; + } + protected Dispatcher createDispatcher() { AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); GenericEventTypeMetrics genericEventTypeMetrics = @@ -1608,7 +1691,13 @@ public static void main(String argv[]) { * Register the handlers for alwaysOn services */ private Dispatcher setupDispatcher() { - Dispatcher dispatcher = createDispatcher(); + Dispatcher dispatcher; + if (conf.getBoolean(YarnConfiguration.MULTI_THREAD_DISPATCHER_ENABLED, + YarnConfiguration.DEFAULT_MULTI_THREAD_DISPATCHER_ENABLED)) { + dispatcher = createMultiThreadDispatcher(); + } else { + dispatcher = createDispatcher(); + } dispatcher.register(RMFatalEventType.class, new ResourceManager.RMFatalEventDispatcher()); return dispatcher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index 1ea470cf094..514e3a52e91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -25,10 +25,12 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -79,4 +81,52 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { rmDispatcher.stop(); } } + + @Test(timeout=10000) + public void testMultiThreadAsyncDispatcher() { + ResourceManager.MultiThreadDispatcher rmDispatcher = + new ResourceManager.MultiThreadDispatcher(); + RMContextImpl rmContext = new RMContextImpl(rmDispatcher, null, null, + null, null, null, null, null, null); + NodesListManager nodesListManager = spy(new NodesListManager(rmContext)); + YarnConfiguration conf = new YarnConfiguration(); + rmDispatcher.register(NodesListManagerEventType.class, + nodesListManager); + rmDispatcher.init(conf); + rmDispatcher.start(); + + try { + RMNode rmNode1 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost1", 1234); + RMNode rmNode2 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost2", 1234); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_USABLE, rmNode1)); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_UNUSABLE, rmNode2)); + + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + + // Make sure corresponding Async dispatcher handled. + AsyncDispatcher nodeListManagerAsyncDispatcher = rmDispatcher. + getEventTypeAsyncDispatcher(NodesListManagerEventType.class); + Assert.assertNotNull(nodeListManagerAsyncDispatcher); + + // Make sure nodesListManager handle 2 times. + verify(nodesListManager, times(2)). + handle(any(NodesListManagerEvent.class)); + + } catch (InterruptedException e) { + Assert.fail(); + } finally { + rmDispatcher.stop(); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index b9c5500a7d2..92dbb6ea3f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -73,6 +73,9 @@ @Before public void setUp() throws Exception { YarnConfiguration conf = new YarnConfiguration(); + // Test multi thread dispatcher + conf.setBoolean(YarnConfiguration. + MULTI_THREAD_DISPATCHER_ENABLED, true); UserGroupInformation.setConfiguration(conf); DefaultMetricsSystem.setMiniClusterMode(true); resourceManager = new ResourceManager();