diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..d4c975a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyFederationPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException; + +import java.util.*; + +/** + * An implementation of the {@link AMRMProxyFederationPolicy} that simply + * broadcasts each {@link ResourceRequest} to all the available sub-clusters. + */ +public class BroadcastAMRMProxyFederationPolicy + + implements AMRMProxyFederationPolicy { + private Configuration conf; + + Set knownClusterIds = new HashSet<>(); + + @Override + public void reinitialize(FederationPolicyContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests, + Map activeSubclusters) + throws YarnException { + Map> answer = new HashMap<>(); + if (activeSubclusters.size() < 1) { + throw new YarnException( + "Zero active subclusters, cannot pick where to fordward " + + "AllocateRequest."); + } + // simply broadcast the resource request to all sub-clusters + for (FederationSubClusterId subClusterId : activeSubclusters.keySet()) { + answer.put(subClusterId, resourceRequests); + knownClusterIds.add(subClusterId); + } + + return answer; + } + + @Override + public void notifyOfResponse(FederationSubClusterId subClusterId, + AllocateResponse response) throws YarnException { + if (!knownClusterIds.contains(subClusterId)) { + throw new UnknownSubclusterException( + "The response is received from a subcluster that is unknown to this " + + "policy."); + } + //stateless policy does not care about responses + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..f756f50 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyFederationPolicy.java @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.resolver.FederationSubClusterResolver; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolutionException; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An implementation of the {@link AMRMProxyFederationPolicy} interface that + * carefully multicast the requests as follows: + * + *

Behavior:

+ * + *

host localized {@link ResourceRequest} are always forwarded to the RM + * that owns the node, based on the feedback of a {@link + * FederationSubClusterResolver}

+ * + *

rack localized {@link ResourceRequest} are forwarded to the RM that owns + * the rack (if the {@link FederationSubClusterResolver} provides this info) or + * they are forwarded as if they were ANY (this is important for deployment that + * stripe racks across sub-clusters) as there is not a single resolution.

+ * + *

ANY request corresponding to node/rack local requests are only forwarded + * to the set of RMs that owns the node-local requests. The number of containers + * listed in each ANY is proportional to the number of node-local container + * requests (associated to this ANY via the same allocateRequestId)

+ * + *

ANY that are not assocaited to node/rack local requests are split among + * RMs based on the "weights" in the {@link WeightedFederationPolicyInfo} + * configuration and headroom information. The {@code headroomAlpha} parameter + * of the policy configuration indicates how muhc headroom contributes to the + * splitting choice. Value of 1.0f indicates the weights are interpreted only as + * 0/1 boolean but all splitting is based on the advertised headroom (fallback + * to 1/N for RMs that we don't have headroom info from). An headroomAlpha value + * of 0.0f means headroom is ignored and all splitting decisions are + * proportional to the "weights" in the configuration of the policy.

+ * + *

ANY of zero size are forwarded to all known subclusters (i.e., + * subclusters where we scheduled before), as they may represent a user attempt + * to cancel a previous request (and we are mostly stateless now, so should + * forward to all known RMs)

+ * + *

Invariants:

+ * + *

The policy always excludes non-active RMs

+ * + *

The policy always excludes RMs that have 0 weight (even if localized + * resources explicit refer to it)

+ * + *

(Bar rounding to closest ceil of fractional containers) The sum of + * requests made to multiple RM at the ANY level "adds-up" to the user request. + * Maximum possible excess in a request is a number of containers <= number of + * sub-clusters.

+ */ +public class LocalityMulticastAMRMProxyFederationPolicy + implements AMRMProxyFederationPolicy { + private static final Log LOG = + LogFactory.getLog(LocalityMulticastAMRMProxyFederationPolicy.class); + + WeightedFederationPolicyInfo policyInfo = null; + Map weights; + FederationSubClusterResolver resolver; + + Map headroom; + float hrAlpha; + + @Override + public void reinitialize(FederationPolicyContext federationPolicyContext) + throws FederationPolicyInitializationException { + + // perform consistency checks + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + // if nothing has changed skip the rest of initialization + WeightedFederationPolicyInfo newPolicyInfo = WeightedFederationPolicyInfo + .fromByteBuffer( + federationPolicyContext.getFederationPolicyConfiguration() + .getParams()); + if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { + return; + } + + Map newWeights = + newPolicyInfo.getAmrmWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + + if (federationPolicyContext.getFederationSubclusterResolver() == null) { + throw new FederationPolicyInitializationException( + "Resolver cannot be null."); + } + + boolean allInactive = true; + for (Float f : newWeights.values()) { + if (f > 0) { + allInactive = false; + } + } + if (allInactive) { + LOG.warn("The weights used to configure this policy are all set " + + "to zero!"); + } + + weights = newWeights; + policyInfo = newPolicyInfo; + resolver = federationPolicyContext.getFederationSubclusterResolver(); + + if (headroom == null) { + headroom = new ConcurrentHashMap<>(); + } + hrAlpha = policyInfo.getHeadroomAlpha(); + } + + @Override + public void notifyOfResponse(FederationSubClusterId subClusterId, + AllocateResponse response) throws YarnException { + //stateless policy does not care about responses + headroom.put(subClusterId, response.getAvailableResources()); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests, + Map activeSubclusters) + throws YarnException { + Map> answer = new TreeMap<>(); + if (activeSubclusters.size() < 1) { + throw new NoActiveSubclustersException( + "Zero active subclusters, cannot pick where to fordward " + + "AllocateRequest."); + } + + //object used to accumulate statistics about split + AllocationBookkeeper bookkeeper = new AllocationBookkeeper(); + + // Construct an answer for all node and rack requests + // Accumulate all sub-clusters that make sense for an ANY request + splitNodeAndRackRequests(resourceRequests, activeSubclusters, answer, + bookkeeper); + + //add to the answer the split of ANY requests + splitAnyRequests(activeSubclusters, answer, bookkeeper); + + return answer; + } + + private void splitNodeAndRackRequests(List resourceRequests, + Map activeSubclusters, + Map> answer, + AllocationBookkeeper bookkeeper) throws YarnException { + for (ResourceRequest rr : resourceRequests) { + + // if RR is "ANY" book-keep for later + if (rr.isAnyLocation(rr.getResourceName())) { + bookkeeper.addNonLocalizedResourceRequest(rr); + } else { + FederationSubClusterId nativeId = null; + try { + nativeId = resolver.getSubClusterForNode(rr.getResourceName()); + } catch (SubClusterResolutionException u) { + LOG.info("Resolution for " + rr.getResourceName() + " failed."); + // this must be rack-local or unknown treat it as an ANY + bookkeeper.addNonLocalizedResourceRequest(rr); + continue; + } + // if the subcluster is active and allowed by the policy + if (weights.containsKey(nativeId) && weights.get(nativeId) > 0 && + activeSubclusters.containsKey(nativeId)) { + + //Add node-local RR for this RM to answer + if (!answer.containsKey(nativeId)) { + answer.put(nativeId, new ArrayList()); + } + answer.get(nativeId).add(rr); + + //remember the decisions we made to deal with ANY later + bookkeeper.addLocalizedResourceRequest(nativeId, rr); + } + } + } + } + + private void splitAnyRequests( + Map activeSubclusters, + Map> answer, + AllocationBookkeeper bookkeeper) { + // Include all ANY requests, locally to requests if locality is false + // of everywhere if locality is true + for (Map.Entry> anyReq : bookkeeper + .getAnyRequests().entrySet()) { + for (ResourceRequest resourceRequest : anyReq.getValue()) { + Long allocationId = anyReq.getKey(); + + // FIRST: pick the target set of subclusters (all for unbounded ANY or + // only those touched by localized asks for bounded ANY) + Set targetSubclusters = + activeSubclusters.keySet(); + if (bookkeeper.getAllocationIdToSubcluster() + .containsKey(allocationId)) { + targetSubclusters = + bookkeeper.getAllocationIdToSubcluster().get(allocationId); + } + + // SECOND: pick how much to ask to each RM + for (FederationSubClusterId targetId : targetSubclusters) { + if (weights.containsKey(targetId) && weights.get(targetId) > 0) { + ResourceRequest partialRR = + splitIndividualAny(resourceRequest, targetId, + bookkeeper.getContainerPerRM(), activeSubclusters); + if (partialRR != null) { + if (!answer.containsKey(targetId)) { + answer.put(targetId, new ArrayList()); + } + answer.get(targetId).add(partialRR); + } + } + } + } + } + } + + /** + * Return a projection of this ANY {@link ResourceRequest} that belongs to + * this sub-cluster. This is done based on the "count" of the containers that + * require locality in each sublcuster (if any) or based on the "weights" in + * the policy. + */ + private ResourceRequest splitIndividualAny( + ResourceRequest originalResourceRequest, FederationSubClusterId targetId, + Map> containersPerRM, + Map activeSubclusters) { + + // clone the ResourceRequest + long reqId = originalResourceRequest.getAllocationRequestId(); + + // then work on assigning the right number of containers to it + float numContainer = originalResourceRequest.getNumContainers(); + + // If the ANY request has 0 containers to begin with I must forward it to + // any RM we have previously contacted (this might be the user way + // to cancel a previous request). + if (numContainer == 0 && headroom.containsKey(targetId)) { + return originalResourceRequest; + } + + //split ANY based on count of node-local containers in each sub-cluster + if (containersPerRM.containsKey(reqId)) { + float localityBasedWeight = + getLocalityBasedWeighting(containersPerRM, reqId, targetId); + numContainer = numContainer * localityBasedWeight; + + } else { + // split ANY based on load and policy configuration + float headroomWeighting = + getHeadroomWeighting(targetId, activeSubclusters); + float policyWeighting = + getPolicyConfigWeighting(targetId, activeSubclusters); + + // final split is a linear combination of headroom and + // policyConfigWeights + numContainer = numContainer * (hrAlpha * headroomWeighting + + (1 - hrAlpha) * policyWeighting); + } + + // omit requests of zero size (if original size was >0) + ResourceRequest out = null; + if (numContainer > 0) { + out = ResourceRequest.newInstance(originalResourceRequest.getPriority(), + originalResourceRequest.getResourceName(), + originalResourceRequest.getCapability(), + originalResourceRequest.getNumContainers(), + originalResourceRequest.getRelaxLocality(), + originalResourceRequest.getNodeLabelExpression(), + originalResourceRequest.getExecutionTypeRequest()); + out.setAllocationRequestId(reqId); + out.setNumContainers((int) Math.ceil(numContainer)); + } + return out; + } + + /** + * Compute the weight to assign to a subcluster based on how many local + * requests a subcluster is target of. + */ + private float getLocalityBasedWeighting( + Map> containersPerRM, + long reqId, FederationSubClusterId targetId) { + float totWeight = 0; + for (AtomicLong count : containersPerRM.get(reqId).values()) { + totWeight += (float) count.get(); + } + if (totWeight == 0 || containersPerRM.get(reqId).get(targetId) == null) { + return 0; + } + float localWeight = (float) containersPerRM.get(reqId).get(targetId).get(); + return localWeight / totWeight; + } + + /** + * Compute the "weighting" to give to a sublcuster based on the configured + * policy weights (for the active subclusters). + */ + private float getPolicyConfigWeighting(FederationSubClusterId targetId, + Map activeSubclusters) { + float totWeight = 0; + for (Map.Entry entry : weights.entrySet()) { + if (activeSubclusters.containsKey(entry.getKey())) { + totWeight += entry.getValue(); + } + } + + // guard against all-zero policy and set weight proportionally + float policyWeighting = 0; + if (totWeight > 0 && weights.containsKey(targetId)) { + policyWeighting = weights.get(targetId) / totWeight; + } + return policyWeighting; + } + + /** + * Compute the weighting based on available headroom. This is proportional to + * the available headroom memory announced by RM, or to 1/N for RMs we have + * not seen yet. We handle carefully the "set" of RMs we can schedule on to be + * those that are both active and have a "weight" > 0 in the policy. + */ + private float getHeadroomWeighting(FederationSubClusterId targetId, + Map activeSubclusters) { + + Set activeAndEnabledSC = new HashSet<>(); + for (FederationSubClusterId id : activeSubclusters.keySet()) { + if (weights.containsKey(id) && weights.get(id) > 0) { + activeAndEnabledSC.add(id); + } + } + + // baseline weight for all RMs + float headroomWeighting = 1 / (float) activeAndEnabledSC.size(); + + // compute more precise weight if info are available for this RM + //compute total headroom + float totHeadroomMemory = 0; + int totHeadRoomEnabledRMs = 0; + for (Map.Entry r : headroom.entrySet()) { + if (activeAndEnabledSC.contains(r.getKey())) { + totHeadroomMemory += r.getValue().getMemorySize(); + totHeadRoomEnabledRMs++; + } + } + if (headroom.containsKey(targetId) && totHeadroomMemory > 0) { + //headroomWeighting is computing as a boost/shrink w.r.t. AVG + //headroom available among RMs we have already contacted + headroomWeighting = + (headroom.get(targetId).getMemorySize() / totHeadroomMemory) * ( + totHeadRoomEnabledRMs / (float) activeAndEnabledSC.size()); + } + return headroomWeighting; + } + + /** + * This helper class is used to bookeep the requests made to each subcluster. + */ + private class AllocationBookkeeper { + + Map> allocationIdToSubcluster = + new HashMap<>(); + Map> anyRequests = new HashMap<>(); + Map> countContainersPerRM = + new HashMap<>(); + + void addLocalizedResourceRequest(FederationSubClusterId nativeId, + ResourceRequest rr) { + if (!allocationIdToSubcluster.containsKey(rr.getAllocationRequestId())) { + allocationIdToSubcluster.put(rr.getAllocationRequestId(), + new HashSet()); + } + allocationIdToSubcluster.get(rr.getAllocationRequestId()).add(nativeId); + + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(nativeId)) { + countContainersPerRM.get(rr.getAllocationRequestId()) + .put(nativeId, new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(nativeId) + .incrementAndGet(); + } + + public void addNonLocalizedResourceRequest(ResourceRequest rr) { + long allocationRequestId = rr.getAllocationRequestId(); + if (!anyRequests.containsKey(allocationRequestId)) { + anyRequests.put(allocationRequestId, new ArrayList<>()); + } + anyRequests.get(allocationRequestId).add(rr); + } + + public Map> getAnyRequests() { + return anyRequests; + } + + public Map> getAllocationIdToSubcluster() { + return allocationIdToSubcluster; + } + + public Map> getContainerPerRM() { + return countContainersPerRM; + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..1bee145 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.AMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Simple test class for the {@link BroadcastAMRMProxyFederationPolicy}. + */ +public class TestBroadcastAMRMProxyFederationPolicy { + + AMRMProxyFederationPolicy policy; + FederationSubClusterId sc1; + FederationSubClusterId sc2; + Map activeSubclusters; + + @Before + public void setUp() throws Exception { + policy = new BroadcastAMRMProxyFederationPolicy(); + activeSubclusters = new HashMap<>(); + sc1 = FederationSubClusterId.newInstance("sc1"); + sc2 = FederationSubClusterId.newInstance("sc2"); + activeSubclusters.put(sc1, mock(FederationSubClusterInfo.class)); + activeSubclusters.put(sc2, mock(FederationSubClusterInfo.class)); + } + + @Test + public void testReinitilialize() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration(FederationPolicyConfiguration + .newInstance(policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + policy.reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration( + FederationPolicyConfiguration.newInstance("WrongPolicyName", buf)); + policy.reinitialize(fpc); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + //verify the request is broadcasted to all subclusters + + String[] hosts = new String[] { "host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + Assert.assertTrue(response.size() == 2); + for (Map.Entry> entry : response + .entrySet()) { + Assert.assertTrue(activeSubclusters.get(entry.getKey()) != null); + for (ResourceRequest r : entry.getValue()) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + for (FederationSubClusterId subClusterId : activeSubclusters.keySet()) { + for (ResourceRequest r : response.get(subClusterId)) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + } + + @Test + public void testNotifyOfResponse() throws Exception { + + String[] hosts = new String[] { "host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + try { + policy.notifyOfResponse(FederationSubClusterId.newInstance("sc3"), + mock(AllocateResponse.class)); + Assert.fail(); + } catch (FederationPolicyException f) { + System.out.println("Expected: " + f.getMessage()); + } + + policy.notifyOfResponse(FederationSubClusterId.newInstance("sc1"), + mock(AllocateResponse.class)); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..350db9d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyFederationPolicy.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.FederationSubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.FederationSubClusterResolver; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolutionException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.*; + +import static org.mockito.Mockito.mock; + +/** + * Simple test class for the {@link LocalityMulticastAMRMProxyFederationPolicy}. + */ +public class TestLocalityMulticastAMRMProxyFederationPolicy { + + AMRMProxyFederationPolicy policy; + WeightedFederationPolicyInfo policyInfo; + Random rand = new Random(); + FederationSubClusterId sc1; + FederationSubClusterId sc2; + Map activeSubclusters; + FederationPolicyContext federationPolicyContext; + + @Before + public void setUp() throws Exception { + policy = new LocalityMulticastAMRMProxyFederationPolicy(); + activeSubclusters = new HashMap<>(); + policyInfo = new WeightedFederationPolicyInfo(); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 6; i++) { + FederationSubClusterIdInfo sc = new FederationSubClusterIdInfo("sc" + i); + // sub-cluster 3 is not active + if (i != 3) { + activeSubclusters.put(sc.toId(), mock(FederationSubClusterInfo.class)); + } + float weight = 1 / 10f; + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + // sub-cluster 4 is "disabled" in the weights + if (i == 4) { + routerWeights.put(sc, 0f); + amrmWeights.put(sc, 0f); + } + } + policyInfo.setRouterWeights(routerWeights); + policyInfo.setAmrmWeights(amrmWeights); + policyInfo.setHeadroomAlpha(0.5f); + } + + @Test + public void testReinitilialize() throws YarnException { + initializePolicy(); + } + + private void initializePolicy() + throws FederationPolicyInitializationException { + federationPolicyContext = new FederationPolicyContext(); + federationPolicyContext.setFederationSubclusterResolver( + FederationPoliciesTestUtil.initResolver()); + ByteBuffer buf = WeightedFederationPolicyInfo.toByteBuffer(policyInfo); + federationPolicyContext.setFederationPolicyConfiguration( + FederationPolicyConfiguration + .newInstance(policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(federationPolicyContext); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + policy.reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration( + FederationPolicyConfiguration.newInstance("WrongPolicyName", buf)); + policy.reinitialize(fpc); + } + + @Test + public void testSplitBasedOnHeadroom() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + policyInfo.setHeadroomAlpha(1.0f); + + initializePolicy(); + List resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + Assert.assertEquals(75, + response.get(FederationSubClusterId.newInstance("sc0")).get(0) + .getNumContainers()); + Assert.assertNull(response.get(FederationSubClusterId.newInstance("sc1"))); + Assert.assertEquals(1, + response.get(FederationSubClusterId.newInstance("sc2")).get(0) + .getNumContainers()); + Assert.assertEquals(25, + response.get(FederationSubClusterId.newInstance("sc5")).get(0) + .getNumContainers()); + + } + + @Test + public void testFWDAllZeroANY() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + policyInfo.setHeadroomAlpha(0.5f); + + initializePolicy(); + List resourceRequests = createZeroSizedANYRequest(); + + // this receives responses from sc0,sc1,sc2 + prepPolicyWithHeadroom(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + //we expect all three to appear for a zero-sized ANY + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + Assert.assertEquals(0, + response.get(FederationSubClusterId.newInstance("sc0")).get(0) + .getNumContainers()); + Assert.assertEquals(0, + response.get(FederationSubClusterId.newInstance("sc1")).get(0) + .getNumContainers()); + Assert.assertEquals(0, + response.get(FederationSubClusterId.newInstance("sc2")).get(0) + .getNumContainers()); + + Assert.assertNull(response.get(FederationSubClusterId.newInstance("sc3"))); + Assert.assertNull(response.get(FederationSubClusterId.newInstance("sc4"))); + Assert.assertNull(response.get(FederationSubClusterId.newInstance("sc5"))); + } + + @Test + public void testSplitBasedOnHeadroomAndWeights() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + + // Configure policy to be 50% headroom based and 50% weight based + policyInfo.setHeadroomAlpha(0.5f); + + initializePolicy(); + List resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + Assert.assertEquals(50, + response.get(FederationSubClusterId.newInstance("sc0")).get(0) + .getNumContainers()); + Assert.assertEquals(13, + response.get(FederationSubClusterId.newInstance("sc1")).get(0) + .getNumContainers()); + Assert.assertEquals(13, + response.get(FederationSubClusterId.newInstance("sc2")).get(0) + .getNumContainers()); + Assert.assertEquals(25, + response.get(FederationSubClusterId.newInstance("sc5")).get(0) + .getNumContainers()); + + } + + private void prepPolicyWithHeadroom() throws YarnException { + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + policy.notifyOfResponse(FederationSubClusterId.newInstance("sc0"), ar); + + ar = getAllocateResponseWithTargetHeadroom(0); + policy.notifyOfResponse(FederationSubClusterId.newInstance("sc1"), ar); + + ar = getAllocateResponseWithTargetHeadroom(1); + policy.notifyOfResponse(FederationSubClusterId.newInstance("sc2"), ar); + } + + private AllocateResponse getAllocateResponseWithTargetHeadroom( + int numContainers) { + return AllocateResponse + .newInstance(0, null, null, Collections.emptyList(), + Resource.newInstance(numContainers * 1024, numContainers), null, 10, + null, Collections.emptyList()); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + // Test a complex List is split correctly + initializePolicy(); + List resourceRequests = createComplexRequest(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + validateSplit(response, resourceRequests); + prettyPrintRequests(response); + + // check that hard-code expected behaviors hold + List rrs = + response.get(FederationSubClusterId.newInstance("sc0")); + Assert.assertEquals(7, rrs.size()); + + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 1L); + } + + rrs = response.get(FederationSubClusterId.newInstance("sc1")); + Assert.assertEquals(6, rrs.size()); + + boolean sc2wasFound = false; + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 0L); + + //NOTE: the mapping for rack sc2-rack2 is on-purpose left dangling + // (missing in the "nodes" file for the subcluster resolver) + // the policy behavior checked here is that if a rack-locality + // cannot be resolved to a single RM, (e.g., RMs are striped + // across racks), than rack is treated as a ANY and sent to + // all RMs where hosts belong. + if (rr.getResourceName().equals("sc2-rack2")) { + sc2wasFound = true; + } + } + Assert.assertTrue(sc2wasFound); + + rrs = response.get(FederationSubClusterId.newInstance("sc2")); + Assert.assertEquals(4, rrs.size()); + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 0L); + } + + rrs = response.get(FederationSubClusterId.newInstance("sc3")); + Assert.assertNull(rrs); + + rrs = response.get(FederationSubClusterId.newInstance("sc4")); + Assert.assertNull(rrs); + + rrs = response.get(FederationSubClusterId.newInstance("sc5")); + Assert.assertEquals(1, rrs.size()); + + for (ResourceRequest rr : rrs) { + if (rr.getAllocationRequestId() == 2) { + //this is the fraction of containers that belong to this + // subcluster (100 total / 5 active-and-enabled sub-clusters) + Assert.assertEquals(25, rr.getNumContainers()); + } + Assert.assertTrue(rr.getAllocationRequestId() >= 2); + Assert.assertTrue(rr.getRelaxLocality() == true); + } + } + + private void validateSplit( + Map> split, + List original) throws YarnException { + + FederationSubClusterResolver resolver = + federationPolicyContext.getFederationSubclusterResolver(); + + // Apply general validation rules + int numUsedSubclusters = split.size(); + + Set originalIds = new HashSet<>(); + Set splitIds = new HashSet<>(); + + int originalContainers = 0; + for (ResourceRequest rr : original) { + originalContainers += rr.getNumContainers(); + originalIds.add(rr.getAllocationRequestId()); + } + + int splitContainers = 0; + for (Map.Entry> rrs : split + .entrySet()) { + for (ResourceRequest rr : rrs.getValue()) { + splitContainers += rr.getNumContainers(); + splitIds.add(rr.getAllocationRequestId()); + // check node-local asks are sent to right RM (only) + try { + FederationSubClusterId fid = + resolver.getSubClusterForNode(rr.getResourceName()); + if (!fid.equals(rrs.getKey())) { + Assert.fail("A node-local (or resolvable rack-local) RR should not " + + "be send to an RM other than what it resolves to."); + } + } catch (SubClusterResolutionException s) { + // expected + } + } + } + + // check we are not inventing Allocation Ids + Assert.assertEquals(originalIds, splitIds); + + // check we are not exceedingly replicating the container asks among + // RMs (a little is allowed due to rounding of fractional splits) + Assert.assertTrue(" Containers requested (" + splitContainers + ") " + + "should not exceed the original count of containers (" + + originalContainers + ") by more than the number of subclusters (" + + numUsedSubclusters + ")", + originalContainers + numUsedSubclusters >= splitContainers); + + // Test target Ids + for (FederationSubClusterId targetId : split.keySet()) { + Assert.assertTrue("Target subclusters should be in the " + "active set", + activeSubclusters.containsKey(targetId)); + Assert.assertTrue( + "Target subclusters should have weight >0 in the " + "policy", + policyInfo.getRouterWeights().get(targetId) > 0); + } + } + + private void prettyPrintRequests( + Map> response) { + for (Map.Entry> entry : response + .entrySet()) { + String str = ""; + for (ResourceRequest rr : entry.getValue()) { + str += " [id:" + rr.getAllocationRequestId() + " loc:" + rr + .getResourceName() + " numCont:" + rr.getNumContainers() + "], "; + } + System.out.println(entry.getKey() + " --> " + str); + } + } + + private List createSimpleRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, ResourceRequest.ANY, 1024, 1, 1, 100, null, + true)); + return out; + } + + private List createZeroSizedANYRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, ResourceRequest.ANY, 1024, 1, 1, 0, null, + true)); + return out; + } + + private List createComplexRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, "sc0-rack0-host0", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, "sc0-rack0", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, ResourceRequest.ANY, 1024, 1, 1, 1, null, + false)); + + // create a single container request with 3 alternative hosts across sc1,sc2 + // where we want 2 containers in sc1 and 1 in sc2 + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "sc1-rack1-host1", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "sc1-rack1-host2", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "sc2-rack2-host3", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "sc1-rack1", 1024, 1, 1, 2, null, false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "sc2-rack2", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, ResourceRequest.ANY, 1024, 1, 1, 2, null, + false)); + + // create a non-local ANY request that can span anything + out.add(FederationPoliciesTestUtil + .createResourceRequest(2L, ResourceRequest.ANY, 1024, 1, 1, 100, null, + true)); + + // create a single container request in sc0 with relaxed locality + out.add(FederationPoliciesTestUtil + .createResourceRequest(3L, "sc0-rack0-host0", 1024, 1, 1, 1, null, + true)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(3L, "sc0-rack0", 1024, 1, 1, 1, null, true)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(3L, ResourceRequest.ANY, 1024, 1, 1, 1, null, + true)); + + return out; + } + +} \ No newline at end of file