From 1978f989d824fb559c9a9d9032484cc06c79e0f1 Mon Sep 17 00:00:00 2001 From: hcarrot <1012314219@qq.com> Date: Mon, 11 Nov 2019 17:54:27 +0800 Subject: [PATCH] YARN-9927. RM multi-thread event processing mechanism --- .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../yarn/event/MultiAsyncDispatcher.java | 75 +++++++++++++++++++ .../hadoop/yarn/event/MultiDispatcher.java | 39 ++++++++++ .../server/resourcemanager/RMContext.java | 3 + .../server/resourcemanager/RMContextImpl.java | 11 +++ .../resourcemanager/RMServiceContext.java | 10 +++ .../resourcemanager/ResourceManager.java | 75 +++++++++++++++++++ 7 files changed, 217 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiAsyncDispatcher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiDispatcher.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 83871a5bf52..51fbef3fcef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -710,6 +710,10 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10; + public static final String RM_MULTI_DISPATCHERS_POOL_SIZE = + RM_PREFIX + "multi-dispatchers.pool-size"; + public static final int DEFAULT_RM_MULTI_DISPATCHERS_POOL_SIZE = 10; + //RM delegation token related keys public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiAsyncDispatcher.java new file mode 100644 index 00000000000..cc0df6d3c80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiAsyncDispatcher.java @@ -0,0 +1,75 @@ +package org.apache.hadoop.yarn.event; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; + +import java.util.ArrayList; +import java.util.List; + +public class MultiAsyncDispatcher extends AbstractService implements MultiDispatcher { + + private static final Logger LOG = + LoggerFactory.getLogger(MultiAsyncDispatcher.class); + + private List multiAsyncDispatchers; + + public MultiAsyncDispatcher(int dispatcherNum) { + this(dispatcherNum, "MultiAsyncDispatcher event handler-"); + } + + public MultiAsyncDispatcher(int dispatcherNum, String dispatcherName) { + super("MultiAsyncDispatcher"); + this.multiAsyncDispatchers = new ArrayList<>(dispatcherNum); + for (int i = 0; i < dispatcherNum; i++) { + AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName + "-" + i); + multiAsyncDispatchers.add(dispatcher); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + for (AsyncDispatcher asyncDispatcher : multiAsyncDispatchers) { + asyncDispatcher.serviceInit(conf); + } + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + //start all the components + super.serviceStart(); + for (AsyncDispatcher asyncDispatcher : multiAsyncDispatchers) { + asyncDispatcher.serviceStart(); + } + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher asyncDispatcher : multiAsyncDispatchers) { + asyncDispatcher.setDrainEventsOnStop(); + } + } + + @Override + protected void serviceStop() throws Exception { + // stop all the components + for (AsyncDispatcher asyncDispatcher : multiAsyncDispatchers) { + asyncDispatcher.serviceStop(); + } + super.serviceStop(); + } + + @Override + public EventHandler getEventHandler(Object key) { + int index = (key.hashCode() & Integer.MAX_VALUE) % multiAsyncDispatchers.size(); + return multiAsyncDispatchers.get(index).getEventHandler(); + } + + @Override + public void register(Class eventType, EventHandler handler) { + for (AsyncDispatcher asyncDispatcher : multiAsyncDispatchers) { + asyncDispatcher.register(eventType, handler); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiDispatcher.java new file mode 100644 index 00000000000..ed5b0689aa3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/MultiDispatcher.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Multi Event Dispatcher interface. It dispatches events to different dispatcher + * based on key and dispatches events to registered + * event handlers based on event types. + * + */ +@SuppressWarnings("rawtypes") +@Public +@Evolving + +public interface MultiDispatcher { + + EventHandler getEventHandler(Object key); + + void register(Class eventType, EventHandler handler); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 55420bd9270..52ba7804529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.MultiDispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -65,6 +66,8 @@ Dispatcher getDispatcher(); + MultiDispatcher getMultiDispatcher(); + boolean isHAEnabled(); HAServiceState getHAServiceState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 54e0281f7e8..58972bfb294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -22,6 +22,7 @@ import java.net.URISyntaxException; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.yarn.event.MultiDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -202,10 +203,20 @@ public Dispatcher getDispatcher() { return serviceContext.getDispatcher(); } + @Override + public MultiDispatcher getMultiDispatcher() { + return serviceContext.getMultiAsyncDispatcher(); + } + void setDispatcher(Dispatcher dispatcher) { serviceContext.setDispatcher(dispatcher); } + + void setMultiDispatcher(MultiDispatcher multiAsyncDispatcher) { + serviceContext.setMultiAsyncDispatcher(multiAsyncDispatcher); + } + @Override public AdminService getRMAdminService() { return serviceContext.getRMAdminService(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java index 45c61667cf9..1c89594ff22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.MultiDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; @@ -42,6 +43,7 @@ public class RMServiceContext { private Dispatcher rmDispatcher; + private MultiDispatcher multiAsyncDispatcher; private boolean isHAEnabled; private HAServiceState haServiceState = HAServiceProtocol.HAServiceState.INITIALIZING; @@ -80,6 +82,14 @@ void setDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } + public MultiDispatcher getMultiAsyncDispatcher() { + return this.multiAsyncDispatcher; + } + + void setMultiAsyncDispatcher(MultiDispatcher multiDdispatcher) { + this.multiAsyncDispatcher = multiDdispatcher; + } + public EmbeddedElector getLeaderElectorService() { return this.elector; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d939ac1525e..4f65c02024c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.MultiDispatcher; +import org.apache.hadoop.yarn.event.MultiAsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; @@ -183,6 +185,9 @@ @VisibleForTesting protected RMContextImpl rmContext; private Dispatcher rmDispatcher; + + private MultiDispatcher multiAsyncDispatcher; + @VisibleForTesting protected AdminService adminService; @@ -253,6 +258,11 @@ Dispatcher getRmDispatcher() { return rmDispatcher; } + @VisibleForTesting + MultiDispatcher getMultiAsyncDispatcher() { + return multiAsyncDispatcher; + } + @VisibleForTesting protected ResourceProfilesManager createResourceProfileManager() { ResourceProfilesManager resourceProfilesManager = @@ -307,6 +317,14 @@ protected void serviceInit(Configuration conf) throws Exception { addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); + // init multidispatchers + int dispatcherNum = conf.getInt(YarnConfiguration.RM_MULTI_DISPATCHERS_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_MULTI_DISPATCHERS_POOL_SIZE); + multiAsyncDispatcher = setupMultiDispatcher(dispatcherNum); + // Register event handler for RmNodes + addIfService(multiAsyncDispatcher); + rmContext.setMultiDispatcher(multiAsyncDispatcher); + // The order of services below should not be changed as services will be // started in same order // As elector service needs admin service to be initialized and started, @@ -450,6 +468,10 @@ protected Dispatcher createDispatcher() { return new AsyncDispatcher("RM Event dispatcher"); } + protected MultiDispatcher createMultiDispatcher(int dispatcherNum) { + return new MultiAsyncDispatcher(dispatcherNum, "RM Event multi-dispatcher"); + } + protected ResourceScheduler createScheduler() { String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, YarnConfiguration.DEFAULT_RM_SCHEDULER); @@ -770,6 +792,8 @@ protected void serviceInit(Configuration configuration) throws Exception { // Register event handler for RmNodes rmDispatcher.register( + RMNodeEventType.class, new NodeEventMultiDispatcher(rmContext)); + multiAsyncDispatcher.register( RMNodeEventType.class, new NodeEventDispatcher(rmContext)); nmLivelinessMonitor = createNMLivelinessMonitor(); @@ -1145,6 +1169,36 @@ public void handle(RMNodeEvent event) { } } + @Private + public static final class NodeEventMultiDispatcher implements + EventHandler { + + private final RMContext rmContext; + + public NodeEventMultiDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMNodeEvent event) { + NodeId nodeId = event.getNodeId(); + RMNode node = this.rmContext.getRMNodes().get(nodeId); + MultiDispatcher multiAsyncDispatcher = this.rmContext.getMultiDispatcher(); + if (nodeId != null && node != null) { + try { + if (event.getType() == RMNodeEventType.STATUS_UPDATE) { + multiAsyncDispatcher.getEventHandler(nodeId).handle(event); + } else { + ((EventHandler) node).handle(event); + } + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for node " + nodeId, t); + } + } + } + } + /** * Return a HttpServer.Builder that the journalnode / namenode / secondary * namenode can use to initialize their HTTP / HTTPS server. @@ -1583,6 +1637,13 @@ private Dispatcher setupDispatcher() { return dispatcher; } + /** + * Register the handlers for alwaysOn services + */ + private MultiDispatcher setupMultiDispatcher(int dispatcherNum) { + return createMultiDispatcher(dispatcherNum); + } + private void resetRMContext() { RMContextImpl rmContextImpl = new RMContextImpl(); // transfer service context to new RM service Context @@ -1601,6 +1662,20 @@ private void resetRMContext() { rmContextImpl.setDispatcher(dispatcher); rmContext = rmContextImpl; + + // reset multiDispatcher + int dispatcherNum = conf.getInt(YarnConfiguration.RM_MULTI_DISPATCHERS_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_MULTI_DISPATCHERS_POOL_SIZE); + MultiDispatcher multiDispatcher = setupMultiDispatcher(dispatcherNum); + ((Service) multiDispatcher).init(this.conf); + ((Service) multiDispatcher).start(); + removeService((Service) multiAsyncDispatcher); + // Need to stop previous multiAsyncDispatcher before assigning new multiAsyncDispatcher + // otherwise causes "multiAsyncDispatcher event handler" thread leak + ((Service) multiAsyncDispatcher).stop(); + multiAsyncDispatcher = multiDispatcher; + addIfService(multiAsyncDispatcher); + rmContextImpl.setMultiDispatcher(multiDispatcher); } private void setSchedulerRecoveryStartAndWaitTime(RMState state, -- 2.20.1 (Apple Git-117)