diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 63031df21fa..a268c3b9ca5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3715,7 +3715,10 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = FEDERATION_PREFIX + "amrmproxy.subcluster.timeout.ms"; public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = - 60000; // one minute + 180000; // 3 minutes + + public static final String FEDERATION_BLACKLIST_SUBCLUSTERS = + FEDERATION_PREFIX + "blacklist-subclusters"; public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 9c9f76241b1..ea11c02d553 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -31,9 +32,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -125,16 +129,55 @@ public static final Logger LOG = LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); + // Pending container limit + public static final String LOAD_BASED_SC_SELECTOR_THRESHOLD = + YarnConfiguration.NM_PREFIX + + "yarnpp.least-load-policy-selector.pending-container.threshold"; + public static final int DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD = 10000; + + public static final String LOAD_BASED_SC_SELECTOR_ENABLED = + YarnConfiguration.NM_PREFIX + "yarnpp.least-load-policy-selector.enabled"; + public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED = false; + private static Random rand = new Random(); private Map weights; private SubClusterResolver resolver; private Map headroom; + private Map lastHeartbeatTimeStamp; + private long subClusterTimeOut; private float hrAlpha; private FederationStateStoreFacade federationFacade; private SubClusterId homeSubcluster; + public static final String PRINT_RR_MAX = + YarnConfiguration.NM_PREFIX + + "amrmproxy.address.splitmerge.printmaxrrcount"; + public static final int DEFAULT_PRINT_RR_MAX = 1000; + private int printRRMax; + + /** + * Print a list of Resource Requests into a one line string. + * + * @param response + * @return the printed one line string + */ + public static String prettyPrintRequests(List response, int max) { + StringBuilder builder = new StringBuilder(); + for (ResourceRequest rr : response) { + builder.append("[id:").append(rr.getAllocationRequestId()).append(" loc:") + .append(rr.getResourceName()).append(" num:").append(rr.getNumContainers()).append(" pri:") + .append(((rr.getPriority() != null) ? rr.getPriority().getPriority() : -1)).append("], "); + if (max != -1) { + if (max-- <= 0) { + break; + } + } + } + return builder.toString(); + } + @Override public void reinitialize( FederationPolicyInitializationContext policyContext) @@ -182,22 +225,39 @@ public void reinitialize( if (headroom == null) { headroom = new ConcurrentHashMap<>(); + lastHeartbeatTimeStamp = new ConcurrentHashMap<>(); } hrAlpha = policy.getHeadroomAlpha(); this.federationFacade = policyContext.getFederationStateStoreFacade(); this.homeSubcluster = policyContext.getHomeSubcluster(); + + this.subClusterTimeOut = this.federationFacade.getConf().getLong( + YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + if (this.subClusterTimeOut <= 0) { + LOG.info( + "{} configured to be {}, should be positive. Using default of {}.", + YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + this.subClusterTimeOut, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + this.subClusterTimeOut = + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; + } + + this.printRRMax = this.federationFacade.getConf().getInt(PRINT_RR_MAX, + DEFAULT_PRINT_RR_MAX); } @Override public void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException { - if (response.getAvailableResources() != null) { - headroom.put(subClusterId, response.getAvailableResources()); - LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, - response.getAvailableResources().getMemorySize()); - } + // stateless policy does not care about responses except tracking headroom + headroom.put(subClusterId, response.getAvailableResources()); + LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, + response.getAvailableResources().getMemorySize()); + lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis()); } @Override @@ -209,7 +269,8 @@ public void notifyOfResponse(SubClusterId subClusterId, // active subclusters. Create a new instance per call because this method // can be called concurrently. AllocationBookkeeper bookkeeper = new AllocationBookkeeper(); - bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters); + bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters, + federationFacade.getConf()); List nonLocalizedRequests = new ArrayList(); @@ -232,6 +293,20 @@ public void notifyOfResponse(SubClusterId subClusterId, // Handle "node" requests try { targetId = resolver.getSubClusterForNode(rr.getResourceName()); + + // If needed, re-reroute node requests base on SC load + // Read from config every time so that it is SCDable + Configuration conf = this.federationFacade.getConf(); + if (conf.getBoolean(LOAD_BASED_SC_SELECTOR_ENABLED, + DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED)) { + int maxPendingThreshold = conf.getInt(LOAD_BASED_SC_SELECTOR_THRESHOLD, + DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD); + + targetId = routeNodeRequestIfNeeded(targetId, maxPendingThreshold, + bookkeeper.getActiveAndEnabledSC()); + } + + LOG.debug("Node request {}", rr.getResourceName()); } catch (YarnException e) { // this might happen as we can't differentiate node from rack names // we log altogether later @@ -277,6 +352,15 @@ public void notifyOfResponse(SubClusterId subClusterId, // handle all non-localized requests (ANY) splitAnyRequests(nonLocalizedRequests, bookkeeper); + LOG.info("Before split {} RRs: {}", resourceRequests.size(), + prettyPrintRequests(resourceRequests, this.printRRMax)); + + for (Map.Entry> entry : bookkeeper + .getAnswer().entrySet()) { + LOG.info("After split {} has {} RRs: {}", entry.getKey(), + entry.getValue().size(), + prettyPrintRequests(entry.getValue(), this.printRRMax)); + } return bookkeeper.getAnswer(); } @@ -404,7 +488,7 @@ private void splitIndividualAny(ResourceRequest originalResourceRequest, if (totalWeight == 0) { StringBuilder sb = new StringBuilder(); for (Float weight : weightsList) { - sb.append(weight + ", "); + sb.append(weight).append(", "); } throw new FederationPolicyException( "No positive value found in weight array " + sb.toString()); @@ -483,6 +567,72 @@ private float getHeadroomWeighting(SubClusterId targetId, return headroomWeighting; } + /** + * When certain subcluster is too loaded, reroute Node requests going there. + */ + protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId, + int maxThreshold, Set activeAndEnabledSCs) { + // If targetId is not in the active and enabled SC list, reroute the traffic + if (activeAndEnabledSCs.contains(targetId)) { + int targetPendingCount = getSubClusterLoad(targetId); + if (targetPendingCount == -1 || targetPendingCount < maxThreshold) { + return targetId; + } + } + SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold, + activeAndEnabledSCs); + return scId; + } + + private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId, + int maxThreshold, Set activeAndEnabledSCs) { + ArrayList weights = new ArrayList<>(); + ArrayList scIds = new ArrayList<>(); + int targetLoad = getSubClusterLoad(targetId); + if (targetLoad == -1) { + // Probably a SC that's not active and enabled. Forcing a reroute + targetLoad = Integer.MAX_VALUE; + } + + for (SubClusterId sc : activeAndEnabledSCs) { + int scLoad = getSubClusterLoad(sc); + if (scLoad > targetLoad) { + // Never mind if it is not the most loaded SC + return targetId; + } + + /* + * Prepare the weights for a random draw among all known SCs. + * + * For SC with pending bigger than maxThreshold / 2, use maxThreshold / + * pending as weight. We multiplied by maxThreshold so that the weights + * won't be too small in value. + * + * For SC with pending less than maxThreshold / 2, we cap the weight at 2 + * = (maxThreshold / (maxThreshold / 2)) so that SC with small pending + * will not get a huge weight and thus get swamped. + */ + if (scLoad <= maxThreshold / 2) { + weights.add(2f); + } else { + weights.add((float) maxThreshold / scLoad); + } + scIds.add(sc); + } + if (weights.size() == 0) { + return targetId; + } + return scIds.get(FederationPolicyUtils.getWeightedRandom(weights)); + } + + private int getSubClusterLoad(SubClusterId subClusterId) { + Resource r = this.headroom.get(subClusterId); + if (r == null) { + return -1; + } + return Integer.MAX_VALUE - r.getMemory(); + } + /** * This helper class is used to book-keep the requests made to each * subcluster, and maintain useful statistics to split ANY requests. @@ -511,7 +661,7 @@ private float getHeadroomWeighting(SubClusterId targetId, private void reinitialize( Map activeSubclusters, - Set timedOutSubClusters) throws YarnException { + Set timedOutSubClusters, Configuration conf) throws YarnException { if (activeSubclusters == null) { throw new YarnRuntimeException("null activeSubclusters received"); } @@ -536,6 +686,17 @@ private void reinitialize( } } + // Subcluster blacklisting from configuration + String blacklistedSubClustersFromConfig = + conf.get(YarnConfiguration.FEDERATION_BLACKLIST_SUBCLUSTERS); + if (blacklistedSubClustersFromConfig != null) { + Collection tempList = + StringUtils.getStringCollection(blacklistedSubClustersFromConfig); + for (String item : tempList) { + activeAndEnabledSC.remove(SubClusterId.newInstance(item.trim())); + } + } + if (activeAndEnabledSC.size() < 1) { throw new NoActiveSubclustersException( "None of the subclusters enabled in this policy (weight>0) are " @@ -543,8 +704,17 @@ private void reinitialize( } Set tmpSCSet = new HashSet<>(activeAndEnabledSC); - tmpSCSet.removeAll(timedOutSubClusters); - + for (Map.Entry entry : lastHeartbeatTimeStamp + .entrySet()) { + long duration = System.currentTimeMillis() - entry.getValue(); + if (duration > subClusterTimeOut) { + LOG.warn( + "Subcluster {} does not have a success heartbeat for {}s, " + + "skip routing asks there for this request", + entry.getKey(), (double) duration / 1000); + tmpSCSet.remove(entry.getKey()); + } + } if (tmpSCSet.size() < 1) { LOG.warn("All active and enabled subclusters have expired last " + "heartbeat time. Ignore the expiry check for this request"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 10359e44458..a7e1e09fe63 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -99,7 +100,96 @@ public void setUp() throws Exception { getPolicyInfo().setAMRMPolicyWeights(amrmWeights); getPolicyInfo().setHeadroomAlpha(0.5f); setHomeSubCluster(SubClusterId.newInstance("homesubcluster")); + } + + @Test + public void testPickSubClusterIdForMaxLoadSC() throws YarnException { + int pendingThreshold = 1000; + + LocalityMulticastAMRMProxyPolicy policy = + (LocalityMulticastAMRMProxyPolicy) getPolicy(); + initializePolicy(); + // This cluster is the most overloaded - 4 times the threshold. + SubClusterId sc0 = SubClusterId.newInstance("0"); + Resource r0 = + Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0); + // This cluster is the most overloaded - 4 times the threshold. + SubClusterId sc1 = SubClusterId.newInstance("1"); + Resource r1 = + Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0); + // This cluster is 2 times the threshold, but not the most loaded. + SubClusterId sc2 = SubClusterId.newInstance("2"); + Resource r2 = + Resource.newInstance(Integer.MAX_VALUE - 2 * pendingThreshold, 0); + // This cluster is at the threshold, but not the most loaded. + SubClusterId sc3 = SubClusterId.newInstance("3"); + Resource r3 = Resource.newInstance(Integer.MAX_VALUE - pendingThreshold, 0); + // This cluster has zero pending. + SubClusterId sc4 = SubClusterId.newInstance("4"); + Resource r4 = Resource.newInstance(Integer.MAX_VALUE, 0); + + Set scList = new HashSet<>(); + scList.add(sc0); + scList.add(sc1); + scList.add(sc2); + scList.add(sc3); + scList.add(sc4); + + policy.notifyOfResponse(sc0, getAllocateResponseWithTargetHeadroom(r0)); + policy.notifyOfResponse(sc1, getAllocateResponseWithTargetHeadroom(r1)); + policy.notifyOfResponse(sc2, getAllocateResponseWithTargetHeadroom(r2)); + policy.notifyOfResponse(sc3, getAllocateResponseWithTargetHeadroom(r3)); + policy.notifyOfResponse(sc4, getAllocateResponseWithTargetHeadroom(r4)); + + // sc2, sc3 and sc4 should just return the original subcluster. + Assert.assertTrue( + policy.routeNodeRequestIfNeeded(sc2, pendingThreshold, scList) + .equals(sc2)); + Assert.assertTrue( + policy.routeNodeRequestIfNeeded(sc3, pendingThreshold, scList) + .equals(sc3)); + Assert.assertTrue( + policy.routeNodeRequestIfNeeded(sc4, pendingThreshold, scList) + .equals(sc4)); + + // sc0 and sc1 must select from sc0/sc1/sc2/sc3/sc4 according to weights + // 1/4, 1/4, 1/2, 1, 2. Let's run tons of random of samples, and verify that + // the proportion approximately holds. + Map counts = new HashMap<>(); + counts.put(sc0, 0); + counts.put(sc1, 0); + counts.put(sc2, 0); + counts.put(sc3, 0); + counts.put(sc4, 0); + int n = 100000; + for (int i = 0; i < n; i++) { + SubClusterId selectedId = + policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList); + counts.put(selectedId, counts.get(selectedId) + 1); + + selectedId = + policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList); + counts.put(selectedId, counts.get(selectedId) + 1); + + // Also try a new SCId that's not active and enabled. Should be rerouted + // to sc0-4 with the same distribution as above + selectedId = policy + .routeNodeRequestIfNeeded(SubClusterId.newInstance("10"), + pendingThreshold, scList); + counts.put(selectedId, counts.get(selectedId) + 1); + } + + // The probability should be 1/16, 1/16, 1/8, 1/4, 1/2 + final double DELTA = 0.1; + Assert.assertEquals(counts.get(sc0) / n / 3, 0.0625, DELTA); + Assert.assertEquals(counts.get(sc1) / n / 3, 0.0625, DELTA); + Assert.assertEquals(counts.get(sc2) / n / 3, 0.125, DELTA); + Assert.assertEquals(counts.get(sc3) / n / 3, 0.25, DELTA); + Assert.assertEquals(counts.get(sc4) / n / 3, 0.5, DELTA); + + // Everything should be routed to these five active and enabled SCs + Assert.assertEquals(5, counts.size()); } @Test @@ -324,6 +414,12 @@ private AllocateResponse getAllocateResponseWithTargetHeadroom( Resource.newInstance(numContainers * 1024, numContainers), null, 10, null, Collections. emptyList()); } + + private AllocateResponse getAllocateResponseWithTargetHeadroom(Resource r) { + return AllocateResponse.newInstance(0, null, null, + Collections. emptyList(), r, null, 10, null, + Collections. emptyList()); + } /** * modify default initialization to include a "homesubcluster" which we will