diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 3b9c30ff114..b19caffb51f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -147,6 +148,23 @@ public static AllocateResponse newInstance(int responseId, .collectorInfo(collectorInfo).build(); } + @Private + @Unstable + public static AllocateResponse newInstance(int responseId, + List completedContainers, + List allocatedContainers, List updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List nmTokens, Token amRMToken, + List updatedContainers, CollectorInfo collectorInfo, + EnhancedHeadroom enhancedHeadroom) { + AllocateResponse response = + newInstance(responseId, completedContainers, allocatedContainers, + updatedNodes, availResources, command, numClusterNodes, preempt, + nmTokens, amRMToken, updatedContainers, collectorInfo); + response.setEnhancedHeadroom(enhancedHeadroom); + return response; + } + /** * If the ResourceManager needs the * ApplicationMaster to take some action then it will send an @@ -439,6 +457,14 @@ public static AllocateResponseBuilder newBuilder() { return new AllocateResponseBuilder(); } + @Public + @Unstable + public abstract EnhancedHeadroom getEnhancedHeadroom(); + + @Private + @Unstable + public abstract void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom); + /** * Class to construct instances of {@link AllocateResponse} with specific * options. @@ -666,6 +692,18 @@ public AllocateResponseBuilder containersFromPreviousAttempt( return this; } + @Public + @Unstable + public EnhancedHeadroom getEnhancedHeadroom() { + return allocateResponse.getEnhancedHeadroom(); + } + + @Private + @Unstable + public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom){ + allocateResponse.setEnhancedHeadroom(enhancedHeadroom); + } + /** * Return generated {@link AllocateResponse} object. * @return {@link AllocateResponse} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java new file mode 100644 index 00000000000..bd27ad3e07a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.util.Records; + +/** + * Enhanced head room in AllocateResponse. + */ +public abstract class EnhancedHeadroom { + + public static EnhancedHeadroom newInstance(int totalPendingCount, + int totalActiveCores) { + EnhancedHeadroom enhancedHeadroom = + Records.newRecord(EnhancedHeadroom.class); + enhancedHeadroom.setTotalPendingCount(totalPendingCount); + enhancedHeadroom.setTotalActiveCores(totalActiveCores); + return enhancedHeadroom; + } + + public abstract void setTotalPendingCount(int totalPendingCount); + + public abstract int getTotalPendingCount(); + + public abstract void setTotalActiveCores(int totalActiveCores); + + public abstract int getTotalActiveCores(); + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(""); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 63031df21fa..eec3ccdc86c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3715,7 +3715,39 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = FEDERATION_PREFIX + "amrmproxy.subcluster.timeout.ms"; public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = - 60000; // one minute + 180000; // 3 minutes + + // Pending container limit + public static final String LOAD_BASED_SC_SELECTOR_THRESHOLD = + YarnConfiguration.NM_PREFIX + + "yarnpp.least-load-policy-selector.pending-container.threshold"; + public static final int DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD = 10000; + + // Whether to consider total number of active cores in the subcluster for load + public static final String LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE = + "yarn.nodemanager.yarnpp.least-load-policy-selector.use-active-core"; + public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE = + false; + + public static final String LOAD_BASED_SC_SELECTOR_MULTIPLIER = + "yarn.nodemanager.yarnpp.least-load-policy-selector.use-active-core"; + public static final int DEFAULT_LOAD_BASED_SC_SELECTOR_MULTIPLIER = 50000; + + public static final String LOAD_BASED_SC_SELECTOR_ENABLED = + YarnConfiguration.NM_PREFIX + "yarnpp.least-load-policy-selector.enabled"; + public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED = false; + + public static final String LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR = + "yarn.nodemanager.yarnpp.least-load-policy-selector.fail-on-error"; + public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR = + true; + + public static final String FEDERATION_BLACKLIST_SUBCLUSTERS = + FEDERATION_PREFIX + "blacklist-subclusters"; + + public static final String FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY = + FEDERATION_PREFIX + "amrmproxy.allocation.history.max.entry"; + public static final int DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY = 100; public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8a0273d7a79..abc89a6782d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -105,6 +105,10 @@ message UpdatedContainerProto { required ContainerUpdateTypeProto update_type = 1; required ContainerProto container = 2; } +message EnhancedHeadroomProto { + optional int32 total_pending_count = 1; + optional int32 total_active_cores = 2; +} message AllocateResponseProto { optional AMCommandProto a_m_command = 1; @@ -123,6 +127,7 @@ message AllocateResponseProto { repeated UpdatedContainerProto updated_containers = 16; repeated ContainerProto containers_from_previous_attempts = 17; repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18; + optional EnhancedHeadroomProto enhanced_headroom = 19; } enum SchedulerResourceTypes { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index a5705d275fe..f29fd1bfea8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; @@ -89,6 +91,7 @@ private Token amrmToken = null; private Priority appPriority = null; private CollectorInfo collectorInfo = null; + private EnhancedHeadroom enhancedHeadroom = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -190,6 +193,9 @@ private synchronized void mergeLocalToBuilder() { getContainerProtoIterable(this.containersFromPreviousAttempts); builder.addAllContainersFromPreviousAttempts(iterable); } + if (this.enhancedHeadroom != null) { + builder.setEnhancedHeadroom(convertToProtoFormat(this.enhancedHeadroom)); + } } private synchronized void mergeLocalToProto() { @@ -422,6 +428,27 @@ public synchronized void setAMRMToken(Token amRMToken) { this.amrmToken = amRMToken; } + @Override + public EnhancedHeadroom getEnhancedHeadroom() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if (enhancedHeadroom != null) { + return enhancedHeadroom; + } + if (!p.hasEnhancedHeadroom()) { + return null; + } + this.enhancedHeadroom = convertFromProtoFormat(p.getEnhancedHeadroom()); + return enhancedHeadroom; + } + + @Override + public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom) { + maybeInitBuilder(); + if (enhancedHeadroom == null) { + builder.clearEnhancedHeadroom(); + } + this.enhancedHeadroom = enhancedHeadroom; + } @Override public synchronized CollectorInfo getCollectorInfo() { @@ -933,4 +960,13 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } -} + + private EnhancedHeadroomPBImpl convertFromProtoFormat( + YarnServiceProtos.EnhancedHeadroomProto p) { + return new EnhancedHeadroomPBImpl(p); + } + + private YarnServiceProtos.EnhancedHeadroomProto convertToProtoFormat(EnhancedHeadroom t) { + return ((EnhancedHeadroomPBImpl) t).getProto(); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java new file mode 100644 index 00000000000..08114080974 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +public class EnhancedHeadroomPBImpl extends EnhancedHeadroom { + + private EnhancedHeadroomProto proto = + EnhancedHeadroomProto.getDefaultInstance(); + private EnhancedHeadroomProto.Builder builder = null; + private boolean viaProto = false; + + public EnhancedHeadroomPBImpl() { + builder = EnhancedHeadroomProto.newBuilder(); + } + + public EnhancedHeadroomPBImpl(EnhancedHeadroomProto proto) { + this.proto = proto; + viaProto = true; + } + + public EnhancedHeadroomProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + // No local content yet + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = EnhancedHeadroomProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public void setTotalPendingCount(int totalPendingCount) { + maybeInitBuilder(); + if (totalPendingCount == 0) { + builder.clearTotalPendingCount(); + return; + } + builder.setTotalPendingCount(totalPendingCount); + } + + @Override + public int getTotalPendingCount() { + EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasTotalPendingCount()) ? p.getTotalPendingCount() : 0; + } + + @Override + public void setTotalActiveCores(int totalActiveCores) { + maybeInitBuilder(); + if (totalActiveCores == 0) { + builder.clearTotalActiveCores(); + return; + } + builder.setTotalActiveCores(totalActiveCores); + } + + @Override + public int getTotalActiveCores() { + EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasTotalActiveCores()) ? p.getTotalActiveCores() : 0; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index ac43b122f4c..0e4ec510e9a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,14 +134,18 @@ private AMRMClientRelayerMetrics metrics; + private ContainerAllocationHistory allocationHistory; + + private final MonotonicClock clock = new MonotonicClock(); + public AMRMClientRelayer(ApplicationMasterProtocol rmClient, - ApplicationId appId, String rmId) { + Configuration conf, ApplicationId appId, String rmId) { this.resetResponseId = -1; this.metrics = AMRMClientRelayerMetrics.getInstance(); - this.rmId = ""; this.rmClient = rmClient; this.appId = appId; this.rmId = rmId; + this.allocationHistory = new ContainerAllocationHistory(conf); } public void setAMRegistrationRequest( @@ -309,12 +315,13 @@ private void addNewAllocateRequest(AllocateRequest allocateRequest) public AllocateResponse allocate(AllocateRequest allocateRequest) throws YarnException, IOException { AllocateResponse allocateResponse = null; - long startTime = System.currentTimeMillis(); + long startTime = this.clock.getTime(); synchronized (this) { if(this.shutdown){ throw new YarnException("Allocate called after AMRMClientRelayer for " + "RM " + rmId + " shutdown."); } + addNewAllocateRequest(allocateRequest); ArrayList askList = new ArrayList<>(ask.size()); @@ -383,7 +390,7 @@ public AllocateResponse allocate(AllocateRequest allocateRequest) } catch (Throwable t) { // Unexpected exception - rethrow and increment heart beat failure metric this.metrics.addHeartbeatFailure(this.rmId, - System.currentTimeMillis() - startTime); + this.clock.getTime() - startTime); // If RM is complaining about responseId out of sync, force reset next // time @@ -434,7 +441,7 @@ public AllocateResponse allocate(AllocateRequest allocateRequest) private void updateMetrics(AllocateResponse allocateResponse, long startTime) { this.metrics.addHeartbeatSuccess(this.rmId, - System.currentTimeMillis() - startTime); + this.clock.getTime() - startTime); // Process the allocate response from RM if (allocateResponse.getAllocatedContainers() != null) { for (Container container : allocateResponse @@ -444,22 +451,26 @@ private void updateMetrics(AllocateResponse allocateResponse, if (this.knownContainers.add(container.getId())) { this.metrics.addFulfilledQPS(this.rmId, AMRMClientRelayerMetrics .getRequestType(container.getExecutionType()), 1); + + long currentTime = this.clock.getTime(); + long fulfillLatency = -1; + if (container.getAllocationRequestId() != 0) { Integer count = this.pendingCountForMetrics .get(container.getAllocationRequestId()); + if (count != null && count > 0) { this.pendingCountForMetrics .put(container.getAllocationRequestId(), --count); this.metrics.decrClientPending(this.rmId, AMRMClientRelayerMetrics .getRequestType(container.getExecutionType()), 1); - this.metrics.addFulfillLatency(this.rmId, - AMRMClientRelayerMetrics - .getRequestType(container.getExecutionType()), - System.currentTimeMillis() - this.askTimeStamp - .get(container.getAllocationRequestId())); + + fulfillLatency = currentTime - this.askTimeStamp.get(container.getAllocationRequestId()); + this.metrics.addFulfillLatency(this.rmId, AMRMClientRelayerMetrics.getRequestType(container.getExecutionType()), fulfillLatency); } } + addAllocationHistoryEntry(container, currentTime, fulfillLatency); } } } @@ -558,7 +569,7 @@ private void addNewAsks(List asks) throws YarnException { for (ResourceRequestSetKey key : nonZeroNewKeys) { if(remotePendingAsks.containsKey(key)){ this.askTimeStamp.put(key.getAllocationRequestId(), - System.currentTimeMillis()); + this.clock.getTime()); int count = this.remotePendingAsks.get(key).getNumContainers(); this.pendingCountForMetrics.put(key.getAllocationRequestId(), count); this.metrics.incrClientPending(this.rmId, @@ -576,10 +587,42 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { this.ask.add(remoteRequest); } + public ContainerAllocationHistory getAllocationHistory() { + return this.allocationHistory; + } + + private void addAllocationHistoryEntry(Container container, + long fulfillTimeStamp, long fulfillLatency) { + ResourceRequestSetKey key = ResourceRequestSetKey + .extractMatchingKey(container, this.remotePendingAsks.keySet(), true); + if (key == null) { + LOG.info("allocation history ignoring {}, no matching request key found", + container); + return; + } + this.allocationHistory.addAllocationEntry(container, key, + this.remotePendingAsks.get(key), fulfillTimeStamp, fulfillLatency); + } + + public void gatherReadOnlyPendingAsksInfo( + Map pendingAsks, + Map pendingTime) { + pendingAsks.clear(); + pendingTime.clear(); + synchronized (this) { + pendingAsks.putAll(this.remotePendingAsks); + for (ResourceRequestSetKey key : pendingAsks.keySet()) { + Long startTime = this.askTimeStamp.get(key.getAllocationRequestId()); + if (startTime != null) { + pendingTime.put(key, this.clock.getTime() - startTime); + } + } + } + } + @VisibleForTesting protected Map getRemotePendingAsks() { return this.remotePendingAsks; } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java new file mode 100644 index 00000000000..16c3fb2cdbe --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java @@ -0,0 +1,66 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server; + +import java.util.AbstractMap; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Records the allocation history from YarnRM and provide aggregated insights. + */ +public class ContainerAllocationHistory { + private static final Logger LOG = + LoggerFactory.getLogger(AMRMClientRelayer.class); + + private int maxEntryCount; + + // Allocate timing history + private Queue> relaxableG = new LinkedList<>(); + + public ContainerAllocationHistory(Configuration conf) { + this.maxEntryCount = + conf.getInt(YarnConfiguration.FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY, + YarnConfiguration.DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY); + } + + public synchronized void addAllocationEntry(Container container, + ResourceRequestSetKey requestKey, ResourceRequestSet requestSet, + long fulfillTimeStamp, long fulfillLatency) { + if (!requestSet.isANYRelaxable()) { + LOG.info("allocation hitory ignoring {}, relax locality is false", + container); + return; + } + this.relaxableG.add(new AbstractMap.SimpleEntry( + fulfillTimeStamp, fulfillLatency)); + if (this.relaxableG.size() > this.maxEntryCount) { + this.relaxableG.remove(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java index 643bfa6da01..9ddbbdd7fca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; @@ -66,4 +67,12 @@ public void reinitialize( return answer; } + @Override + public void addAMRMClientRelayer(SubClusterId subClusterId, + AMRMClientRelayer relayer) throws YarnException { + } + + @Override + public void shutdown() { + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java new file mode 100644 index 00000000000..8cf6c1ab174 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.metrics.LocalityMulticastAMRMProxyPolicyMetrics; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used by {@link LocalityMulticastAMRMProxyPolicy} to re-balance the pending + * asks among all sub-clusters. + */ +public class ContainerAsksBalancer implements Configurable { + public static final Logger LOG = + LoggerFactory.getLogger(ContainerAsksBalancer.class); + + public static final String DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL = + YarnConfiguration.FEDERATION_PREFIX + + "downgrade-to-other-subcluster-interval-ms"; + private static final long DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL = + 10000; + + private Configuration conf; + private LocalityMulticastAMRMProxyPolicyMetrics metrics; + // Holds the pending requests and allocation history of all sub-clusters + private Map clientRelayers; + + private long subclusterAskTimeOut; + private Map> lastRelaxCandidateCount; + + public ContainerAsksBalancer() { + this.clientRelayers = new ConcurrentHashMap<>(); + this.lastRelaxCandidateCount = new ConcurrentHashMap<>(); + this.metrics = LocalityMulticastAMRMProxyPolicyMetrics.getInstance(); + } + + @Override + public void setConf(Configuration config) { + this.conf = config; + this.subclusterAskTimeOut = + this.conf.getLong(DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL, + DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL); + } + + @Override + public Configuration getConf() { + return this.conf; + } + + public void shutdown() { + for (Entry> scEntry : + this.lastRelaxCandidateCount.entrySet()) { + for (Entry typeEntry : scEntry.getValue() + .entrySet()) { + this.metrics.decrSCRelaxablePendingCountMetric( + scEntry.getKey().getId(), typeEntry.getKey(), + typeEntry.getValue()); + } + } + this.lastRelaxCandidateCount.clear(); + this.clientRelayers.clear(); + } + + /** + * Pass in the AMRMClientRelayer for a new sub-cluster. + * + * @param subClusterId sub-cluster id + * @param relayer the AMRMClientRelayer for this sub-cluster + * @throws YarnException if fails + */ + public void addAMRMClientRelayer(SubClusterId subClusterId, + AMRMClientRelayer relayer) throws YarnException { + if (this.clientRelayers.containsKey(subClusterId)) { + LOG.warn("AMRMClientRelayer already exists for " + subClusterId); + } + this.clientRelayers.put(subClusterId, relayer); + + Map map = new ConcurrentHashMap<>(); + for (ExecutionType type : ExecutionType.values()) { + map.put(type, 0L); + } + this.lastRelaxCandidateCount.put(subClusterId, map); + } + + /** + * Notify an allocate response from a sub-cluster. + * + * @param subClusterId sub-cluster id + * @param response allocate response + * @throws YarnException if fails + */ + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + } + + /** + * Modify the output from split-merge (AMRMProxyPolicy). Adding and removing + * asks to balance the pending asks in all sub-clusters. + * + * @param askMap the new asks about to be sent out to all sub-clusters + * @param activeAndEnabledSC the list of sub-cluster we can use for new + */ + public void adjustAsks(Map> askMap, + Set activeAndEnabledSC) { + + Map pendingAsks = + new HashMap<>(); + Map pendingTime = new HashMap<>(); + for (Entry relayerEntry : + this.clientRelayers.entrySet()) { + SubClusterId scId = relayerEntry.getKey(); + + pendingAsks.clear(); + pendingTime.clear(); + relayerEntry.getValue().gatherReadOnlyPendingAsksInfo(pendingAsks, + pendingTime); + + Map currentCandidateCount = new HashMap<>(); + for (Entry pendingTimeEntry : pendingTime + .entrySet()) { + if (pendingTimeEntry.getValue() < this.subclusterAskTimeOut) { + continue; + } + ResourceRequestSetKey askKey = pendingTimeEntry.getKey(); + ResourceRequestSet askSet = pendingAsks.get(askKey); + if (!askSet.isANYRelaxable()) { + continue; + } + long value = askSet.getNumContainers(); + if (currentCandidateCount.containsKey(askKey.getExeType())) { + value += currentCandidateCount.get(askKey.getExeType()); + } + currentCandidateCount.put(askKey.getExeType(), value); + + // For now don't do any actual modifications to askMap, only update + // metrics + } + + // Update the pending metrics for the sub-cluster + updateRelaxCandidateMetrics(scId, currentCandidateCount); + } + } + + protected void updateRelaxCandidateMetrics(SubClusterId scId, + Map currentCandidateCount) { + Map lastValueMap = + this.lastRelaxCandidateCount.get(scId); + for (ExecutionType type : ExecutionType.values()) { + long newValue = 0; + if (currentCandidateCount.containsKey(type)) { + newValue = currentCandidateCount.get(type); + } + long lastValue = lastValueMap.get(type); + lastValueMap.put(type, newValue); + LOG.debug("updating SCRelaxable " + type + " asks in " + scId + " from " + + lastValue + " to " + newValue); + this.metrics.incrSCRelaxablePendingCountMetric(scId.getId(), type, + newValue - lastValue); + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java index 3d39d7280d4..21f1e0eb59a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -64,4 +65,20 @@ void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException; + /** + * Give to the policy the {@link AMRMClientRelayer} for a subcluster, which + * holds the information about pending/outstanding requests for the + * subcluster. + * + * @param subClusterId the subcluster id + * @param relayer the AMRMClientRelayer instance + * @throws YarnException if fails + */ + void addAMRMClientRelayer(SubClusterId subClusterId, + AMRMClientRelayer relayer) throws YarnException; + + /** + * Shutdown and cleanup. + */ + void shutdown(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java index acb7e0af183..437d061f66e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -72,4 +73,13 @@ public void reinitialize( new ArrayList<>(resourceRequests); return Collections.singletonMap(homeSubcluster, resourceRequestsCopy); } + + @Override + public void addAMRMClientRelayer(SubClusterId subClusterId, + AMRMClientRelayer relayer) throws YarnException { + } + + @Override + public void shutdown() { + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 9c9f76241b1..101da5f74be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -31,11 +32,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -53,6 +59,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; + /** * An implementation of the {@link FederationAMRMProxyPolicy} interface that * carefully multicasts the requests with the following behavior: @@ -125,16 +133,27 @@ public static final Logger LOG = LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); + private static Random rand = new Random(); private Map weights; private SubClusterResolver resolver; + private Configuration conf; + private ContainerAsksBalancer askBalancer; + private Map headroom; + private Map enhancedHeadroom; private float hrAlpha; private FederationStateStoreFacade federationFacade; private SubClusterId homeSubcluster; + public static final String PRINT_RR_MAX = + YarnConfiguration.NM_PREFIX + + "amrmproxy.address.splitmerge.printmaxrrcount"; + public static final int DEFAULT_PRINT_RR_MAX = 1000; + private boolean failOnError = DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR; + @Override public void reinitialize( FederationPolicyInitializationContext policyContext) @@ -180,23 +199,51 @@ public void reinitialize( weights = newWeightsConverted; resolver = policyContext.getFederationSubclusterResolver(); + // Data structures that only need to initialize once if (headroom == null) { headroom = new ConcurrentHashMap<>(); + enhancedHeadroom = new ConcurrentHashMap<>(); + askBalancer = new ContainerAsksBalancer(); } hrAlpha = policy.getHeadroomAlpha(); - this.federationFacade = - policyContext.getFederationStateStoreFacade(); + this.federationFacade = policyContext.getFederationStateStoreFacade(); this.homeSubcluster = policyContext.getHomeSubcluster(); + + this.conf = this.federationFacade.getConf(); + this.askBalancer.setConf(conf); + this.failOnError = this.conf + .getBoolean(LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR, + DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR); } @Override public void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException { if (response.getAvailableResources() != null) { - headroom.put(subClusterId, response.getAvailableResources()); - LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, - response.getAvailableResources().getMemorySize()); + this.headroom.put(subClusterId, response.getAvailableResources()); + } + if (response.getEnhancedHeadroom() != null) { + this.enhancedHeadroom.put(subClusterId, response.getEnhancedHeadroom()); + } + LOG.info( + "Subcluster {} updated with AvailableResource {}, EnhancedHeadRoom {}", + subClusterId, response.getAvailableResources(), + response.getEnhancedHeadroom()); + + this.askBalancer.notifyOfResponse(subClusterId, response); + } + + @Override + public void addAMRMClientRelayer(SubClusterId subClusterId, + AMRMClientRelayer relayer) throws YarnException { + this.askBalancer.addAMRMClientRelayer(subClusterId, relayer); + } + + @Override + public void shutdown() { + if (this.askBalancer != null) { + this.askBalancer.shutdown(); } } @@ -209,7 +256,8 @@ public void notifyOfResponse(SubClusterId subClusterId, // active subclusters. Create a new instance per call because this method // can be called concurrently. AllocationBookkeeper bookkeeper = new AllocationBookkeeper(); - bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters); + bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters, + federationFacade.getConf()); List nonLocalizedRequests = new ArrayList(); @@ -232,6 +280,19 @@ public void notifyOfResponse(SubClusterId subClusterId, // Handle "node" requests try { targetId = resolver.getSubClusterForNode(rr.getResourceName()); + + // If needed, re-reroute node requests base on SC load + // Read from config every time so that it is SCDable + if (conf.getBoolean(LOAD_BASED_SC_SELECTOR_ENABLED, + DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED)) { + int maxPendingThreshold = conf.getInt(LOAD_BASED_SC_SELECTOR_THRESHOLD, + DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD); + + targetId = routeNodeRequestIfNeeded(targetId, maxPendingThreshold, + bookkeeper.getActiveAndEnabledSC()); + } + + LOG.debug("Node request {}", rr.getResourceName()); } catch (YarnException e) { // this might happen as we can't differentiate node from rack names // we log altogether later @@ -277,7 +338,11 @@ public void notifyOfResponse(SubClusterId subClusterId, // handle all non-localized requests (ANY) splitAnyRequests(nonLocalizedRequests, bookkeeper); - return bookkeeper.getAnswer(); + // Take the split result, feed into the askBalancer + Map> answer = bookkeeper.getAnswer(); + this.askBalancer.adjustAsks(answer, bookkeeper.getActiveAndEnabledSC()); + + return answer; } /** @@ -404,7 +469,7 @@ private void splitIndividualAny(ResourceRequest originalResourceRequest, if (totalWeight == 0) { StringBuilder sb = new StringBuilder(); for (Float weight : weightsList) { - sb.append(weight + ", "); + sb.append(weight).append(", "); } throw new FederationPolicyException( "No positive value found in weight array " + sb.toString()); @@ -483,6 +548,93 @@ private float getHeadroomWeighting(SubClusterId targetId, return headroomWeighting; } + /** + * When certain subcluster is too loaded, reroute Node requests going there. + */ + protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId, + int maxThreshold, Set activeAndEnabledSCs) { + // If targetId is not in the active and enabled SC list, reroute the traffic + if (activeAndEnabledSCs.contains(targetId)) { + int targetPendingCount = getSubClusterLoad(targetId); + if (targetPendingCount == -1 || targetPendingCount < maxThreshold) { + return targetId; + } + } + SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold, + activeAndEnabledSCs); + return scId; + } + + private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId, + int maxThreshold, Set activeAndEnabledSCs) { + ArrayList weights = new ArrayList<>(); + ArrayList scIds = new ArrayList<>(); + int targetLoad = getSubClusterLoad(targetId); + if (targetLoad == -1) { + // Probably a SC that's not active and enabled. Forcing a reroute + targetLoad = Integer.MAX_VALUE; + } + + for (SubClusterId sc : activeAndEnabledSCs) { + int scLoad = getSubClusterLoad(sc); + if (scLoad > targetLoad) { + // Never mind if it is not the most loaded SC + return targetId; + } + + /* + * Prepare the weights for a random draw among all known SCs. + * + * For SC with pending bigger than maxThreshold / 2, use maxThreshold / + * pending as weight. We multiplied by maxThreshold so that the weights + * won't be too small in value. + * + * For SC with pending less than maxThreshold / 2, we cap the weight at 2 + * = (maxThreshold / (maxThreshold / 2)) so that SC with small pending + * will not get a huge weight and thus get swamped. + */ + if (scLoad <= maxThreshold / 2) { + weights.add(2f); + } else { + weights.add((float) maxThreshold / scLoad); + } + scIds.add(sc); + } + if (weights.size() == 0) { + return targetId; + } + return scIds.get(FederationPolicyUtils.getWeightedRandom(weights)); + } + + private int getSubClusterLoad(SubClusterId subClusterId) { + EnhancedHeadroom headroomData = this.enhancedHeadroom.get(subClusterId); + if (headroomData == null) { + return -1; + } + + // Use new data from enhanced headroom + if (conf.getBoolean( + LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE, + DEFAULT_LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE)) { + if (headroomData.getTotalActiveCores() <= 0) { + return Integer.MAX_VALUE; + } + // Multiply by a constant factor, to ensure the numerator > + // denominator. + int multiplier = conf.getInt(LOAD_BASED_SC_SELECTOR_MULTIPLIER, + DEFAULT_LOAD_BASED_SC_SELECTOR_MULTIPLIER); + double value = ((double) headroomData.getTotalPendingCount() * multiplier) + / headroomData.getTotalActiveCores(); + if (value > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) value; + } + } else { + return headroomData.getTotalPendingCount(); + } + } + /** * This helper class is used to book-keep the requests made to each * subcluster, and maintain useful statistics to split ANY requests. @@ -511,7 +663,7 @@ private float getHeadroomWeighting(SubClusterId targetId, private void reinitialize( Map activeSubclusters, - Set timedOutSubClusters) throws YarnException { + Set timedOutSubClusters, Configuration conf) throws YarnException { if (activeSubclusters == null) { throw new YarnRuntimeException("null activeSubclusters received"); } @@ -536,15 +688,34 @@ private void reinitialize( } } + // Subcluster blacklisting from configuration + String blacklistedSubClustersFromConfig = + conf.get(YarnConfiguration.FEDERATION_BLACKLIST_SUBCLUSTERS); + if (blacklistedSubClustersFromConfig != null) { + Collection tempList = + StringUtils.getStringCollection(blacklistedSubClustersFromConfig); + for (String item : tempList) { + activeAndEnabledSC.remove(SubClusterId.newInstance(item.trim())); + } + } + if (activeAndEnabledSC.size() < 1) { - throw new NoActiveSubclustersException( + String errorMsg = "None of the subclusters enabled in this policy (weight>0) are " - + "currently active we cannot forward the ResourceRequest(s)"); + + "currently active we cannot forward the ResourceRequest(s)"; + if (failOnError) { + throw new NoActiveSubclustersException(errorMsg); + } else { + LOG.error( + errorMsg + ", continuing by enabling all active subclusters."); + activeAndEnabledSC.addAll(activeSubclusters.keySet()); + for (SubClusterId sc : activeSubclusters.keySet()) { + policyWeights.put(sc, 1.0f); + } + } } Set tmpSCSet = new HashSet<>(activeAndEnabledSC); - tmpSCSet.removeAll(timedOutSubClusters); - if (tmpSCSet.size() < 1) { LOG.warn("All active and enabled subclusters have expired last " + "heartbeat time. Ignore the expiry check for this request"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java index a21234e4a55..4f59458dcf1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -54,4 +55,12 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) + "rejects all routing requests by construction."); } + @Override + public void addAMRMClientRelayer(SubClusterId subClusterId, + AMRMClientRelayer relayer) throws YarnException { + } + + @Override + public void shutdown() { + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java index fec967d86bd..1d97f74ab94 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterId.java @@ -97,4 +97,11 @@ public String toString() { return sb.toString(); } + public String getShortId() { + String[] parts = getId().trim().split("-"); + if (parts.length > 1) { + return "sc" + parts[parts.length - 1]; + } + return getId(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java index cf24bbf361f..34ce615997f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java @@ -36,6 +36,8 @@ private ResourceRequestSetKey key; private int numContainers; + // Whether the ANY RR is relaxable + private boolean relaxable; // ResourceName -> RR private Map asks; @@ -49,6 +51,7 @@ public ResourceRequestSet(ResourceRequestSetKey key) throws YarnException { this.key = key; // leave it zero for now, as if it is a cancel this.numContainers = 0; + this.relaxable = true; this.asks = new HashMap<>(); } @@ -60,6 +63,7 @@ public ResourceRequestSet(ResourceRequestSetKey key) throws YarnException { public ResourceRequestSet(ResourceRequestSet other) { this.key = other.key; this.numContainers = other.numContainers; + this.relaxable = other.relaxable; this.asks = new HashMap<>(); // The assumption is that the RR objects should not be modified without // making a copy @@ -83,9 +87,11 @@ public void addAndOverrideRR(ResourceRequest ask) throws YarnException { this.asks.put(ask.getResourceName(), ask); if (this.key.getExeType().equals(ExecutionType.GUARANTEED)) { - // For G requestSet, update the numContainers only for ANY RR + // For G requestSet, update the numContainers and relax locality only for + // ANY RR if (ask.getResourceName().equals(ResourceRequest.ANY)) { this.numContainers = ask.getNumContainers(); + this.relaxable = ask.getRelaxLocality(); } } else { // The assumption we made about O asks is that all RR in a requestSet has @@ -182,6 +188,15 @@ public void setNumContainers(int newValue) throws YarnException { } } + /** + * Whether the request set is relaxable at ANY level. + * + * @return whether the request set is relaxable at ANY level + */ + public boolean isANYRelaxable() { + return this.relaxable; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java index 38162619c85..13d2d634643 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java @@ -18,11 +18,16 @@ package org.apache.hadoop.yarn.server.scheduler; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The scheduler key for a group of {@link ResourceRequest}. @@ -31,6 +36,8 @@ * SchedulerRequestKey, then we can directly use that. */ public class ResourceRequestSetKey extends SchedulerRequestKey { + private static final Logger LOG = + LoggerFactory.getLogger(ResourceRequestSetKey.class); // More ResourceRequest key fields on top of SchedulerRequestKey private final Resource resource; @@ -130,4 +137,43 @@ public String toString() { + (this.execType.equals(ExecutionType.GUARANTEED) ? " G" : " O" + " r:" + this.resource + "]"); } + + /** + * Extract the corresponding ResourceRequestSetKey for an allocated container + * from a given set. Return null if not found. + * + * @param container the allocated container + * @param keys the set of keys to look from + * @return + */ + public static ResourceRequestSetKey extractMatchingKey(Container container, + Set keys, boolean verbose) { + + ResourceRequestSetKey key = new ResourceRequestSetKey( + container.getAllocationRequestId(), container.getPriority(), + container.getResource(), container.getExecutionType()); + if (keys.contains(key)) { + return key; + } + if (container.getAllocationRequestId() > 0) { + // If no exact match, look for the one with the same (non-zero) + // allocationRequestId + for (ResourceRequestSetKey candidate : keys) { + if (candidate.getAllocationRequestId() == container + .getAllocationRequestId()) { + if (verbose) { + LOG.warn("Using possible match for " + key + ": " + candidate); + } + return candidate; + } + } + } + if (verbose) { + LOG.error("not match found for container " + container); + for (ResourceRequestSetKey candidate : keys) { + LOG.warn("candidate set keys: " + candidate); + } + } + return null; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 47d78309466..a663665cfbe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -136,7 +136,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, this.userUgi = null; // Relayer's rmClient will be set after the RM connection is created this.rmProxyRelayer = - new AMRMClientRelayer(null, this.applicationId, rmName); + new AMRMClientRelayer(null, this.conf, this.applicationId, rmName); this.heartbeatHandler = createAMHeartbeatRequestHandler(this.conf, this.applicationId, this.rmProxyRelayer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java index 46570a1465d..57f0cefd770 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java @@ -153,9 +153,8 @@ public void setResponseIdReset(int expectedResponseId) { @Before public void setup() throws YarnException, IOException { this.conf = new Configuration(); - this.mockAMS = new MockApplicationMasterService(); - this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST"); + this.relayer = new AMRMClientRelayer(this.mockAMS, this.conf, null, "rm"); this.relayer.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance("", 0, "")); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 10359e44458..fd5be121f12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -99,7 +101,102 @@ public void setUp() throws Exception { getPolicyInfo().setAMRMPolicyWeights(amrmWeights); getPolicyInfo().setHeadroomAlpha(0.5f); setHomeSubCluster(SubClusterId.newInstance("homesubcluster")); + } + + @After + public void cleanup() { + ((FederationAMRMProxyPolicy) getPolicy()).shutdown(); + } + + @Test + public void testLoadbasedSubClusterReroute() throws YarnException { + int pendingThreshold = 1000; + + LocalityMulticastAMRMProxyPolicy policy = + (LocalityMulticastAMRMProxyPolicy) getPolicy(); + initializePolicy(); + + SubClusterId sc0 = SubClusterId.newInstance("0"); + SubClusterId sc1 = SubClusterId.newInstance("1"); + SubClusterId sc2 = SubClusterId.newInstance("2"); + SubClusterId sc3 = SubClusterId.newInstance("3"); + SubClusterId sc4 = SubClusterId.newInstance("4"); + Resource r4 = Resource.newInstance(Integer.MAX_VALUE, 0); + + Set scList = new HashSet<>(); + scList.add(sc0); + scList.add(sc1); + scList.add(sc2); + scList.add(sc3); + scList.add(sc4); + + // This cluster is the most overloaded - 4 times the threshold. + policy.notifyOfResponse(sc0, + getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0)); + + // This cluster is the most overloaded - 4 times the threshold. + policy.notifyOfResponse(sc1, + getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0)); + // This cluster is 2 times the threshold, but not the most loaded. + policy.notifyOfResponse(sc2, + getAllocateResponseWithEnhancedHeadroom(2 * pendingThreshold, 0)); + + // This cluster is at the threshold, but not the most loaded. + policy.notifyOfResponse(sc3, + getAllocateResponseWithEnhancedHeadroom(pendingThreshold, 0)); + + // This cluster has zero pending. + policy.notifyOfResponse(sc4, getAllocateResponseWithEnhancedHeadroom(0, 0)); + + // sc2, sc3 and sc4 should just return the original subcluster. + Assert.assertTrue( + policy.routeNodeRequestIfNeeded(sc2, pendingThreshold, scList) + .equals(sc2)); + Assert.assertTrue( + policy.routeNodeRequestIfNeeded(sc3, pendingThreshold, scList) + .equals(sc3)); + Assert.assertTrue( + policy.routeNodeRequestIfNeeded(sc4, pendingThreshold, scList) + .equals(sc4)); + + // sc0 and sc1 must select from sc0/sc1/sc2/sc3/sc4 according to weights + // 1/4, 1/4, 1/2, 1, 2. Let's run tons of random of samples, and verify that + // the proportion approximately holds. + Map counts = new HashMap<>(); + counts.put(sc0, 0); + counts.put(sc1, 0); + counts.put(sc2, 0); + counts.put(sc3, 0); + counts.put(sc4, 0); + int n = 100000; + for (int i = 0; i < n; i++) { + SubClusterId selectedId = + policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList); + counts.put(selectedId, counts.get(selectedId) + 1); + + selectedId = + policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList); + counts.put(selectedId, counts.get(selectedId) + 1); + + // Also try a new SCId that's not active and enabled. Should be rerouted + // to sc0-4 with the same distribution as above + selectedId = policy + .routeNodeRequestIfNeeded(SubClusterId.newInstance("10"), + pendingThreshold, scList); + counts.put(selectedId, counts.get(selectedId) + 1); + } + + // The probability should be 1/16, 1/16, 1/8, 1/4, 1/2 + final double cDELTA = 0.1; + Assert.assertEquals(counts.get(sc0) / n / 3, 0.0625, cDELTA); + Assert.assertEquals(counts.get(sc1) / n / 3, 0.0625, cDELTA); + Assert.assertEquals(counts.get(sc2) / n / 3, 0.125, cDELTA); + Assert.assertEquals(counts.get(sc3) / n / 3, 0.25, cDELTA); + Assert.assertEquals(counts.get(sc4) / n / 3, 0.5, cDELTA); + + // Everything should be routed to these five active and enabled SCs + Assert.assertEquals(5, counts.size()); } @Test @@ -325,6 +422,14 @@ private AllocateResponse getAllocateResponseWithTargetHeadroom( null, Collections. emptyList()); } + private AllocateResponse getAllocateResponseWithEnhancedHeadroom( + int pending, int activeCores) { + return AllocateResponse.newInstance(0, null, null, + Collections. emptyList(), Resource.newInstance(0, 0), null, + 10, null, Collections. emptyList(), null, null, null, + EnhancedHeadroom.newInstance(pending, activeCores)); + } + /** * modify default initialization to include a "homesubcluster" which we will * use as the default for when nodes or racks are unknown. @@ -777,9 +882,9 @@ public void testSubClusterExpiry() throws Exception { Thread.sleep(800); - // For the second time, sc0 and sc5 expired - expiredSCList.add(SubClusterId.newInstance("subcluster0")); - expiredSCList.add(SubClusterId.newInstance("subcluster5")); + // Update the response timestamp for the second time, skipping sc0 and sc5 + prepPolicyWithHeadroom(false); + response = ((FederationAMRMProxyPolicy) getPolicy()) .splitResourceRequests(resourceRequests, expiredSCList); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java index 3e7c57a1dae..518840b9f50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java @@ -139,13 +139,13 @@ public void setup() throws YarnException, IOException { this.mockAMS = new MockApplicationMasterService(); - this.homeRelayer = new AMRMClientRelayer(this.mockAMS, - ApplicationId.newInstance(0, 0), this.homeID); + this.homeRelayer = + new AMRMClientRelayer(this.mockAMS, this.conf, null, this.homeID); this.homeRelayer.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance("", 0, "")); - this.uamRelayer = new AMRMClientRelayer(this.mockAMS, - ApplicationId.newInstance(0, 0), this.uamID); + this.uamRelayer = + new AMRMClientRelayer(this.mockAMS, this.conf, null, this.uamID); this.uamRelayer.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance("", 0, "")); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 5d165c99584..6ad957406dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -242,6 +242,7 @@ /** The policy used to split requests among sub-clusters. */ private FederationAMRMProxyPolicy policyInterpreter; + private final Object policyInterpreterLock = new Object(); private FederationRegistryClient registryClient; @@ -309,9 +310,9 @@ public void init(AMRMProxyApplicationContext appContext) { ApplicationId appId = this.attemptId.getApplicationId(); this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, - ApplicationMasterProtocol.class, appOwner), appId, - this.homeSubClusterId.toString()); + this.homeRMRelayer = new AMRMClientRelayer( + createHomeRMProxy(appContext, ApplicationMasterProtocol.class, + appOwner), conf, appId, this.homeSubClusterId.toString()); this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer); @@ -856,6 +857,10 @@ public void shutdown() { this.threadpool = null; } + if (this.policyInterpreter != null) { + this.policyInterpreter.shutdown(); + } + // Stop the home heartbeat thread this.homeHeartbeartHandler.shutdown(); this.homeRMRelayer.shutdown(); @@ -1247,8 +1252,13 @@ public void run() { getApplicationContext().getUser(), homeSubClusterId.toString(), true, subClusterId); - secondaryRelayers.put(subClusterId, - uamPool.getAMRMClientRelayer(subClusterId)); + // Propagate the new subcluster info + AMRMClientRelayer relayer = + uamPool.getAMRMClientRelayer(subClusterId); + secondaryRelayers.put(subClusterId, relayer); + synchronized (policyInterpreterLock) { + policyInterpreter.addAMRMClientRelayer(scId, relayer); + } uamResponse = uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index c8777f579d5..3e662756f20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -334,6 +335,15 @@ public void allocate(ApplicationAttemptId appAttemptId, .pullJustFinishedContainers()); response.setAvailableResources(allocation.getResourceLimit()); + int totalVirtualCores = this.rmContext.getScheduler().getRootQueueMetrics() + .getAllocatedVirtualCores() + this.rmContext.getScheduler() + .getRootQueueMetrics().getAvailableVirtualCores(); + int pendingContainers = this.rmContext.getScheduler().getRootQueueMetrics() + .getPendingContainers(); + + response.setEnhancedHeadroom( + EnhancedHeadroom.newInstance(pendingContainers, totalVirtualCores)); + addToContainerUpdates(response, allocation, ((AbstractYarnScheduler)getScheduler()) .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());