diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 833aeccd0bd..9c620259e63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2913,6 +2913,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** Enable/Disable multi thread dispatcher for each eventType. **/ + public static final String MULTI_THREAD_DISPATCHER_ENABLED = + RM_PREFIX + "multi.thread.dispatcher.enabled"; + + public static final boolean + DEFAULT_MULTI_THREAD_DISPATCHER_ENABLED = false; + /** * The threshold used to trigger the logging of event types and counts * in RM's main event dispatcher. Default value is 5000, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index ba6bb435ec2..e3158fc7bcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -397,6 +397,11 @@ protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } + @VisibleForTesting + public BlockingQueue getEventQueue() { + return eventQueue; + } + protected boolean isDrained() { return drained; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 9b47431d76d..b1b1ee4fc50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import com.sun.jersey.spi.container.servlet.ServletContainer; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -152,6 +154,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -473,6 +476,87 @@ protected void setRMStateStore(RMStateStore rmStore) { return dispatcher; } + /** + * Dispatches events using multiple threads, + * each eventType with single thread. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static class MultiThreadDispatcher extends CompositeService + implements Dispatcher { + + private ConcurrentMap, + AsyncDispatcher> eventTypeDispatcherMap = + new ConcurrentHashMap, AsyncDispatcher>(); + + public MultiThreadDispatcher() { + super(ResourceManager.MultiThreadDispatcher.class.getName()); + } + + @Override + public EventHandler getEventHandler() { + return new CompositeEventHandler(); + } + + @Override + public void register(Class eventType, + EventHandler handler) { + if (eventTypeDispatcherMap.get(eventType) == null) { + AsyncDispatcher asyncDispatcher = + createDispatcher(eventType); + eventTypeDispatcherMap.put(eventType, + asyncDispatcher); + addIfService(asyncDispatcher); + } + eventTypeDispatcherMap. + get(eventType).register(eventType, handler); + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : + eventTypeDispatcherMap.values()) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositeEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Trigger the eventType corresponding handler. + eventTypeDispatcherMap. + get(event.getType().getDeclaringClass()). + getEventHandler(). + handle(event); + } + } + + protected AsyncDispatcher + createDispatcher(Class eventType) { + return new AsyncDispatcher("RM single thread dispatcher for eventType: " + + eventType.getName()); + } + + @VisibleForTesting + public void disableExitOnDispatchException() { + for (AsyncDispatcher dispatcher : + eventTypeDispatcherMap.values()) { + dispatcher.disableExitOnDispatchException(); + } + } + + @VisibleForTesting + public AsyncDispatcher + getEventTypeAsyncDispatcher(Class eventType) { + return eventTypeDispatcherMap.get(eventType); + } + } + + protected Dispatcher createMultiThreadDispatcher() { + MultiThreadDispatcher dispatcher = + new MultiThreadDispatcher(); + return dispatcher; + } + protected Dispatcher createDispatcher() { AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); GenericEventTypeMetrics genericEventTypeMetrics = @@ -1256,8 +1340,19 @@ public void handle(RMAppAttemptEvent event) { private final RMContext rmContext; + private ThreadPoolExecutor multiHandlerThreadPool; + public NodeEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("multiHandlerThread #%d") + .build(); + // Thread pool for multi handler. + // Different RMNodeImpl has different write lock, + // so this can make consuming event faster. + multiHandlerThreadPool = new ThreadPoolExecutor( + 5, 10, 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), threadFactory); } @Override @@ -1265,12 +1360,17 @@ public void handle(RMNodeEvent event) { NodeId nodeId = event.getNodeId(); RMNode node = this.rmContext.getRMNodes().get(nodeId); if (node != null) { - try { - ((EventHandler) node).handle(event); - } catch (Throwable t) { - LOG.error("Error in handling event type " + event.getType() - + " for node " + nodeId, t); - } + multiHandlerThreadPool.submit(new Runnable() { + @Override + public void run() { + try { + ((EventHandler) node).handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for node " + nodeId, t); + } + } + }); } } } @@ -1710,7 +1810,13 @@ public static void main(String argv[]) { * Register the handlers for alwaysOn services */ private Dispatcher setupDispatcher() { - Dispatcher dispatcher = createDispatcher(); + Dispatcher dispatcher; + if (conf.getBoolean(YarnConfiguration.MULTI_THREAD_DISPATCHER_ENABLED, + YarnConfiguration.DEFAULT_MULTI_THREAD_DISPATCHER_ENABLED)) { + dispatcher = createMultiThreadDispatcher(); + } else { + dispatcher = createDispatcher(); + } dispatcher.register(RMFatalEventType.class, new ResourceManager.RMFatalEventDispatcher()); return dispatcher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index 1ea470cf094..514e3a52e91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -25,10 +25,12 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -79,4 +81,52 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { rmDispatcher.stop(); } } + + @Test(timeout=10000) + public void testMultiThreadAsyncDispatcher() { + ResourceManager.MultiThreadDispatcher rmDispatcher = + new ResourceManager.MultiThreadDispatcher(); + RMContextImpl rmContext = new RMContextImpl(rmDispatcher, null, null, + null, null, null, null, null, null); + NodesListManager nodesListManager = spy(new NodesListManager(rmContext)); + YarnConfiguration conf = new YarnConfiguration(); + rmDispatcher.register(NodesListManagerEventType.class, + nodesListManager); + rmDispatcher.init(conf); + rmDispatcher.start(); + + try { + RMNode rmNode1 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost1", 1234); + RMNode rmNode2 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost2", 1234); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_USABLE, rmNode1)); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_UNUSABLE, rmNode2)); + + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + + // Make sure corresponding Async dispatcher handled. + AsyncDispatcher nodeListManagerAsyncDispatcher = rmDispatcher. + getEventTypeAsyncDispatcher(NodesListManagerEventType.class); + Assert.assertNotNull(nodeListManagerAsyncDispatcher); + + // Make sure nodesListManager handle 2 times. + verify(nodesListManager, times(2)). + handle(any(NodesListManagerEvent.class)); + + } catch (InterruptedException e) { + Assert.fail(); + } finally { + rmDispatcher.stop(); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index b9c5500a7d2..92dbb6ea3f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -73,6 +73,9 @@ @Before public void setUp() throws Exception { YarnConfiguration conf = new YarnConfiguration(); + // Test multi thread dispatcher + conf.setBoolean(YarnConfiguration. + MULTI_THREAD_DISPATCHER_ENABLED, true); UserGroupInformation.setConfiguration(conf); DefaultMetricsSystem.setMiniClusterMode(true); resourceManager = new ResourceManager();