Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 9ba63045392256d0b274ec770bb88aed51f9a985) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision b85252197e8f219053021d1808b6c5824ee9e35a) @@ -1132,6 +1132,12 @@ public static final int DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20; + /** + * If RM should send the DECOMISSIONING state and SHUTDOWN state to AMs + */ + public static final String RM_NODESTATE_COMPATIBLE_V2 = RM_PREFIX + "nodestate-compatible-v2"; + public static final boolean DEFAULT_RM_NODESTATE_COMPATIBLE_V2 = false; + //////////////////////////////// // yarn rest api Configs //////////////////////////////// @@ -3370,7 +3376,7 @@ public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; // 5 minutes - public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 10; public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = FEDERATION_PREFIX + "flush-cache-for-rm-addr"; @@ -3387,7 +3393,7 @@ // 5 minutes public static final int DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = - 5 * 60; + 10; public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; @@ -3421,7 +3427,7 @@ public static final String FEDERATION_POLICY_MANAGER_PARAMS = FEDERATION_PREFIX + "policy-manager-params"; - public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = ""; + public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "1,1"; public static final String FEDERATION_STATESTORE_ZK_PREFIX = FEDERATION_PREFIX + "zk-state-store."; @@ -3547,6 +3553,99 @@ public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; + public static final String FEDERATION_GPG_PREFIX = + FEDERATION_PREFIX + "gpg."; + + // The number of threads to use for the GPG scheduled executor service + public static final String GPG_SCHEDULED_EXECUTOR_THREADS = + FEDERATION_GPG_PREFIX + "scheduled.executor.threads"; + public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10; + + // The interval at which the subcluster cleaner runs, -1 means disabled + public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1; + + // The expiration time for a subcluster heartbeat, default is 30 minutes + public static final String GPG_SUBCLUSTER_EXPIRATION_MS = + FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + + public static final String FEDERATION_GPG_POLICY_PREFIX = + FEDERATION_GPG_PREFIX + "policy.generator."; + + /** The interval at which the policy generator runs, default is one hour. */ + public static final String GPG_POLICY_GENERATOR_INTERVAL_MS = + FEDERATION_GPG_POLICY_PREFIX + "interval-ms"; + public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = -1; + + public static final String GPG_BIND_HOST = + FEDERATION_GPG_POLICY_PREFIX + "bind-host"; + + public static final String GPG_ADDRESS = + FEDERATION_GPG_POLICY_PREFIX + "address"; + + public static final String GPG_ADMIN_ADDRESS = + FEDERATION_GPG_POLICY_PREFIX + "admin-address"; + public static final String GPG_ADMIN_PORT = + FEDERATION_GPG_POLICY_PREFIX + "admin-port"; + public static final int DEFAULT_GPG_ADMIN_PORT = 9039; + + public static final String DEFAULT_GPG_ADMIN_ADDRESS = + "0.0.0.0:" + DEFAULT_GPG_ADMIN_PORT; + + public static final String GPG_ADMIN_CLIENT_THREAD_COUNT = + FEDERATION_GPG_POLICY_PREFIX + "admin.client.thread-count"; + public static final int DEFAULT_GPG_ADMIN_CLIENT_THREAD_COUNT = 1; + /** + * The configured policy generator class, runs NoOpGlobalPolicy by + * default. + */ + public static final String GPG_GLOBAL_POLICY_CLASS = + FEDERATION_GPG_POLICY_PREFIX + "class"; + public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS = + "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." + + "NoOpGlobalPolicy"; + + /** + * Whether or not the policy generator is running in read only (won't modify + * policies), default is false. + */ + public static final String GPG_POLICY_GENERATOR_READONLY = + FEDERATION_GPG_POLICY_PREFIX + "readonly"; + public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = + false; + + /** + * Which sub-clusters the policy generator should blacklist. + */ + public static final String GPG_POLICY_GENERATOR_BLACKLIST = + FEDERATION_GPG_POLICY_PREFIX + "blacklist"; + + /** Max time to wait to establish a connection to RM */ + public static final String GPG_CONNECT_MAX_WAIT_MS = + FEDERATION_GPG_POLICY_PREFIX + "connect.max-wait.ms"; + public static final long DEFAULT_GPG_CONNECT_MAX_WAIT_MS = + 15 * 60 * 1000; + + /** Time interval between each attempt to connect to RM */ + public static final String GPG_CONNECT_RETRY_INTERVAL_MS = + FEDERATION_GPG_POLICY_PREFIX + "connect.retry-interval.ms"; + public static final long DEFAULT_GPG_CONNECT_RETRY_INTERVAL_MS + = 30 * 1000; + + /** 任务路由默认集群,云窗任务默认集群为wq-yarn-cluster-2,对应参数为ROUTER_HS_SUBCLUSTER, + * 其他任务默认集群为wq-yarn-cluster-1,对应参数为ROUTER_HOME_SUBCLUSTER*/ + public static final String ROUTER_HOME_SUBCLUSTER = + FEDERATION_PREFIX + "home_subcluster"; + public static final String DEFAULT_ROUTER_HOME_SUBCLUSTER = + "wq-yarn-cluster-1"; + public static final String ROUTER_HS_SUBCLUSTER = + FEDERATION_PREFIX + "home_subcluster"; + public static final String DEFAULT_ROUTER_HS_SUBCLUSTER = + "wq-yarn-cluster-2"; + + //////////////////////////////// // Other Configs //////////////////////////////// Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java (revision 9ba63045392256d0b274ec770bb88aed51f9a985) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java (revision b85252197e8f219053021d1808b6c5824ee9e35a) @@ -30,25 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.CollectorInfo; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerUpdateType; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeUpdateType; -import org.apache.hadoop.yarn.api.records.PreemptionContainer; -import org.apache.hadoop.yarn.api.records.PreemptionContract; -import org.apache.hadoop.yarn.api.records.PreemptionMessage; -import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; -import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; @@ -362,12 +344,18 @@ } private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { + boolean compatibleWithV2 = getRmContext().getYarnConfiguration().getBoolean( + YarnConfiguration.RM_NODESTATE_COMPATIBLE_V2, YarnConfiguration.DEFAULT_RM_NODESTATE_COMPATIBLE_V2); Map updatedNodes = new HashMap<>(); if(app.pullRMNodeUpdates(updatedNodes) > 0) { List updatedNodeReports = new ArrayList<>(); for(Map.Entry rmNodeEntry : updatedNodes.entrySet()) { RMNode rmNode = rmNodeEntry.getKey(); + if(compatibleWithV2 && (rmNode.getState() == NodeState.DECOMMISSIONING || + rmNode.getState() == NodeState.SHUTDOWN)) { + continue; + } SchedulerNodeReport schedulerNodeReport = getScheduler().getNodeReport(rmNode.getNodeID()); Resource used = BuilderUtils.newResource(0, 0);