diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2103ae77c2f..720f992c15f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2913,6 +2913,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** Enable/Disable multi thread dispatcher for each eventType **/ + public static final String MULTI_THREAD_DISPATCHER_ENABLED = + RM_PREFIX + "multi.thread.dispatcher.enabled"; + + public static final boolean + DEFAULT_MULTI_THREAD_DISPATCHER_ENABLED = true; + /** * The threshold used to trigger the logging of event types and counts * in RM's main event dispatcher. Default value is 5000, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6d2a9fed08b..3bd856839f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -21,7 +21,9 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import com.sun.jersey.spi.container.servlet.ServletContainer; +import org.apache.hadoop.yarn.event.*; import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -59,10 +61,6 @@ import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -71,11 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeAttributesManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -149,6 +142,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -459,6 +454,72 @@ protected void setRMStateStore(RMStateStore rmStore) { return dispatcher; } + /** + * Dispatches events using multiple threads, + * each eventType with single thread. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static class MultiThreadDispatcher extends CompositeService + implements Dispatcher { + + private ConcurrentMap, + AsyncDispatcher> eventTypeDispatcherMap = + new ConcurrentHashMap, AsyncDispatcher>(); + + public MultiThreadDispatcher() { + super(ResourceManager.MultiThreadDispatcher.class.getName()); + } + + @Override + public EventHandler getEventHandler() { + return new CompositeEventHandler(); + } + + @Override + public void register(Class eventType, + EventHandler handler) { + if (eventTypeDispatcherMap.get(eventType) == null) { + AsyncDispatcher asyncDispatcher = + createDispatcher(eventType); + eventTypeDispatcherMap.put(eventType, + asyncDispatcher); + addIfService(asyncDispatcher); + } + eventTypeDispatcherMap. + get(eventType).register(eventType, handler); + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : + eventTypeDispatcherMap.values()) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositeEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Trigger the eventType corresponding handler. + eventTypeDispatcherMap. + get(event.getType().getDeclaringClass()). + getEventHandler(). + handle(event); + } + } + + protected AsyncDispatcher createDispatcher(Class eventType) { + return new AsyncDispatcher("RM single thread dispatcher for eventType: " + + eventType.getName()); + } + } + + protected Dispatcher createMultiThreadDispatcher() { + MultiThreadDispatcher dispatcher = + new MultiThreadDispatcher(); + return dispatcher; + } + protected Dispatcher createDispatcher() { AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); GenericEventTypeMetrics genericEventTypeMetrics = @@ -1608,7 +1669,13 @@ public static void main(String argv[]) { * Register the handlers for alwaysOn services */ private Dispatcher setupDispatcher() { - Dispatcher dispatcher = createDispatcher(); + Dispatcher dispatcher; + if (conf.getBoolean(YarnConfiguration.MULTI_THREAD_DISPATCHER_ENABLED, + YarnConfiguration.DEFAULT_MULTI_THREAD_DISPATCHER_ENABLED)) { + dispatcher = createMultiThreadDispatcher(); + } else { + dispatcher = createDispatcher(); + } dispatcher.register(RMFatalEventType.class, new ResourceManager.RMFatalEventDispatcher()); return dispatcher;