diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..98ef87e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyFederationPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.*; + +/** + * An implementation of the {@link AMRMProxyFederationPolicy} that simply + * broadcasts each {@link ResourceRequest} to all the available sub-clusters. + */ +public class BroadcastAMRMProxyFederationPolicy + + implements AMRMProxyFederationPolicy { + private Configuration conf; + + Set knownClusterIds = new HashSet<>(); + + @Override + public void reinitialize(FederationPolicyContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests, + Map activeSubclusters) + throws YarnException { + Map> answer = new HashMap<>(); + if (activeSubclusters.size() < 1) { + throw new YarnException( + "Zero active subclusters, cannot pick where to fordward " + + "AllocateRequest."); + } + // simply broadcast the resource request to all sub-clusters + for (SubClusterId subClusterId : activeSubclusters.keySet()) { + answer.put(subClusterId, resourceRequests); + knownClusterIds.add(subClusterId); + } + + return answer; + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + if (!knownClusterIds.contains(subClusterId)) { + throw new UnknownSubclusterException( + "The response is received from a subcluster that is unknown to this " + + "policy."); + } + //stateless policy does not care about responses + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..99ffe3a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyFederationPolicy.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An implementation of the {@link AMRMProxyFederationPolicy} interface that + * carefully multicast the requests as follows: + * + *

Behavior:

+ * + *

host localized {@link ResourceRequest} are always forwarded to the RM + * that owns the node, based on the feedback of a {@link SubClusterResolver} + *

+ * + *

rack localized {@link ResourceRequest} are forwarded to the RM that owns + * the rack (if the {@link SubClusterResolver} provides this info) or + * they are forwarded as if they were ANY (this is important for deployment that + * stripe racks across sub-clusters) as there is not a single resolution.

+ * + *

ANY request corresponding to node/rack local requests are only forwarded + * to the set of RMs that owns the node-local requests. The number of containers + * listed in each ANY is proportional to the number of node-local container + * requests (associated to this ANY via the same allocateRequestId)

+ * + *

ANY that are not assocaited to node/rack local requests are split among + * RMs based on the "weights" in the {@link WeightedFederationPolicyInfo} + * configuration and headroom information. The {@code headroomAlpha} parameter + * of the policy configuration indicates how muhc headroom contributes to the + * splitting choice. Value of 1.0f indicates the weights are interpreted only as + * 0/1 boolean but all splitting is based on the advertised headroom (fallback + * to 1/N for RMs that we don't have headroom info from). An headroomAlpha value + * of 0.0f means headroom is ignored and all splitting decisions are + * proportional to the "weights" in the configuration of the policy.

+ * + *

ANY of zero size are forwarded to all known subclusters (i.e., + * subclusters where we scheduled before), as they may represent a user attempt + * to cancel a previous request (and we are mostly stateless now, so should + * forward to all known RMs)

+ * + *

Invariants:

+ * + *

The policy always excludes non-active RMs

+ * + *

The policy always excludes RMs that have 0 weight (even if localized + * resources explicit refer to it)

+ * + *

(Bar rounding to closest ceil of fractional containers) The sum of + * requests made to multiple RM at the ANY level "adds-up" to the user request. + * Maximum possible excess in a request is a number of containers <= number of + * sub-clusters.

+ */ +public class LocalityMulticastAMRMProxyFederationPolicy + implements AMRMProxyFederationPolicy { + private static final Log LOG = + LogFactory.getLog(LocalityMulticastAMRMProxyFederationPolicy.class); + + private WeightedFederationPolicyInfo policyInfo = null; + private Map weights; + private SubClusterResolver resolver; + + private Map headroom; + private float hrAlpha; + + @Override + public void reinitialize(FederationPolicyContext federationPolicyContext) + throws FederationPolicyInitializationException { + + // perform consistency checks + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + // if nothing has changed skip the rest of initialization + WeightedFederationPolicyInfo newPolicyInfo = WeightedFederationPolicyInfo + .fromByteBuffer( + federationPolicyContext.getFederationPolicyConfiguration() + .getParams()); + if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { + return; + } + + Map newWeights = + newPolicyInfo.getAmrmWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + + if (federationPolicyContext.getFederationSubclusterResolver() == null) { + throw new FederationPolicyInitializationException( + "Resolver cannot be null."); + } + + boolean allInactive = true; + for (Float f : newWeights.values()) { + if (f > 0) { + allInactive = false; + } + } + if (allInactive) { + LOG.warn("The weights used to configure this policy are all set " + + "to zero!"); + } + + weights = newWeights; + policyInfo = newPolicyInfo; + resolver = federationPolicyContext.getFederationSubclusterResolver(); + + if (headroom == null) { + headroom = new ConcurrentHashMap<>(); + } + hrAlpha = policyInfo.getHeadroomAlpha(); + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + //stateless policy does not care about responses + headroom.put(subClusterId, response.getAvailableResources()); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests, + Map activeSubclusters) + throws YarnException { + + + + //object used to accumulate statistics about the answer + AllocationBookkeeper bookkeeper = + new AllocationBookkeeper(activeSubclusters); + + + + + List nonLocalizedRequests = + new ArrayList(); + + // if the RR is resolved to a local subcluster add it directly (node and + // resolvable racks) + for (ResourceRequest rr : resourceRequests) { + + SubClusterId targetId = null; + try { + targetId = resolver.getSubClusterForNode(rr.getResourceName()); + }catch(YarnException e){ + //System.out.println("NODE RESOLUTION ERROR: " + rr.getResourceName()); + } + if (bookkeeper.isActiveAndEnabled(targetId)) { + bookkeeper.addLocalizedNodeRR(targetId, rr); + } else { + Set targetIds = null; + try { + resolver.getSubClustersForRack(rr.getResourceName()); + } catch(YarnException e){ + //System.out.println("RACK RESOLUTION ERROR: " + rr.getResourceName + // ()); + } + if (targetIds != null && targetIds.size() > 0) { + for (SubClusterId tid : targetIds) { + if (bookkeeper.isActiveAndEnabled(tid)) { + bookkeeper.addRackRR(tid, rr); + } + } + } else { + if (ResourceRequest.isAnyLocation(rr.getResourceName())) { + // some requests for node/rack local on non-enabled clusters + // will make it here, and should be filtered + nonLocalizedRequests.add(rr); + } + } + } + } + + // handle all non-localized requests (ANY + rack with issues) + splitAnyRequests(nonLocalizedRequests, bookkeeper); + + return bookkeeper.getAnswer(); + } + + /** + * It splits a list of non-localized resource requests among sub-clusters + */ + private void splitAnyRequests(List originalResourceRequests, + AllocationBookkeeper bookkeeper) throws YarnException { + + for (ResourceRequest resourceRequest : originalResourceRequests) { + + // FIRST: pick the target set of subclusters (based on whether this RR + // is associated with other localized requests via an allocationId) + Long allocationId = resourceRequest.getAllocationRequestId(); + Set targetSubclusters; + if (bookkeeper.getSubClustersForId(allocationId) != null) { + targetSubclusters = bookkeeper.getSubClustersForId(allocationId); + } else { + targetSubclusters = bookkeeper.getActiveAndEnabledSC(); + } + + // SECOND: pick how much to ask to each RM for each request + splitIndividualAny(resourceRequest, targetSubclusters, bookkeeper); + } + } + + /** + * Return a projection of this ANY {@link ResourceRequest} that belongs to + * this sub-cluster. This is done based on the "count" of the containers that + * require locality in each sublcuster (if any) or based on the "weights" in + * the policy. + */ + private void splitIndividualAny(ResourceRequest originalResourceRequest, + Set targetSubclusters, + AllocationBookkeeper bookkeeper) { + + long allocationId = originalResourceRequest.getAllocationRequestId(); + + for (SubClusterId targetId : targetSubclusters) { + float numContainer = originalResourceRequest.getNumContainers(); + + // If the ANY request has 0 containers to begin with we must forward it to + // any RM we have previously contacted (this might be the user way + // to cancel a previous request). + if (numContainer == 0 && headroom.containsKey(targetId)) { + bookkeeper.addAnyRR(targetId, originalResourceRequest); + } + + // If ANY is associated with localized asks, split based on their ratio + if (bookkeeper.getSubClustersForId(allocationId) != null) { + float localityBasedWeight = + getLocalityBasedWeighting(allocationId, targetId, bookkeeper); + numContainer = numContainer * localityBasedWeight; + } else { + // split ANY based on load and policy configuration + float headroomWeighting = getHeadroomWeighting(targetId, bookkeeper); + float policyWeighting = getPolicyConfigWeighting(targetId, bookkeeper); + numContainer = numContainer * (hrAlpha * headroomWeighting + + (1 - hrAlpha) * policyWeighting); + } + + //if the calculated request is non-empty add it to the answer + if (numContainer > 0) { + ResourceRequest out = ResourceRequest + .newInstance(originalResourceRequest.getPriority(), + originalResourceRequest.getResourceName(), + originalResourceRequest.getCapability(), + originalResourceRequest.getNumContainers(), + originalResourceRequest.getRelaxLocality(), + originalResourceRequest.getNodeLabelExpression(), + originalResourceRequest.getExecutionTypeRequest()); + out.setAllocationRequestId(allocationId); + out.setNumContainers((int) Math.ceil(numContainer)); + if(out.isAnyLocation(out.getResourceName())) { + bookkeeper.addAnyRR(targetId, out); + } else { + bookkeeper.addRackRR(targetId, out); + } + } + } + } + + /** + * Compute the weight to assign to a subcluster based on how many local + * requests a subcluster is target of. + */ + private float getLocalityBasedWeighting(long reqId, + SubClusterId targetId, AllocationBookkeeper bookkeeper) { + float totWeight = bookkeeper.getTotNumLocalizedContainers(); + float localWeight = bookkeeper.getNumLocalizedContainers(reqId, targetId); + return totWeight > 0 ? localWeight / totWeight : 0; + } + + /** + * Compute the "weighting" to give to a sublcuster based on the configured + * policy weights (for the active subclusters). + */ + private float getPolicyConfigWeighting(SubClusterId targetId, + AllocationBookkeeper bookkeeper) { + float totWeight = bookkeeper.totPolicyWeight; + Float localWeight = weights.get(targetId); + return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0; + } + + /** + * Compute the weighting based on available headroom. This is proportional to + * the available headroom memory announced by RM, or to 1/N for RMs we have + * not seen yet. + */ + private float getHeadroomWeighting(SubClusterId targetId, + AllocationBookkeeper bookkeeper) { + + // baseline weight for all RMs + float headroomWeighting = + 1 / (float) bookkeeper.getActiveAndEnabledSC().size(); + + if (headroom.containsKey(targetId) && bookkeeper.totHeadroomMemory > 0) { + // compute which portion of the RMs that are active/enabled have reported + // their headroom (needed as adjustment factor) + //(note: getActiveAndEnabledSC should never be null/zero) + float ratioHeadroomKnown = + bookkeeper.totHeadRoomEnabledRMs / (float) bookkeeper + .getActiveAndEnabledSC().size(); + + //headroomWeighting is the ratio of headroom memory in the targetId + // cluster / total memory. The ratioHeadroomKnown factor is applied to + // adjust for missing information and ensure sum of allocated containers + // closely approximate what the user asked (small excess). + headroomWeighting = (headroom.get(targetId).getMemorySize() + / bookkeeper.totHeadroomMemory) * (ratioHeadroomKnown); + } + return headroomWeighting; + } + + /** + * This helper class is used to book-keep the requests made to each + * subcluster, and maintain useful statistics to split ANY requests + */ + private class AllocationBookkeeper { + + // the answer being accumulated + private Map> answer = + new TreeMap<>(); + + //stores how many containers we have allocated in each RM for localized + // asks, used to correctly "spread" the corresponding ANY + private Map> + countContainersPerRM = new HashMap<>(); + + private Set activeAndEnabledSC = new HashSet<>(); + private AtomicLong totNumLocalizedContainers = new AtomicLong(0); + private float totHeadroomMemory = 0; + private int totHeadRoomEnabledRMs = 0; + private float totPolicyWeight = 0; + + private AllocationBookkeeper( + Map + activeSubclusters) throws YarnException { + + //pre-compute the set of subclusters that are both active and enabled by + // the policy weights + for (SubClusterId id : activeSubclusters.keySet()) { + if (weights.containsKey(id) && weights.get(id) > 0) { + activeAndEnabledSC.add(id); + } + } + + if (activeAndEnabledSC.size() < 1) { + throw new NoActiveSubclustersException( + "None of the subclusters enabled in this policy (weight>0) are " + + "currently active we cannot forward the ResourceRequest(s)"); + } + + //pre-compute headroom-based weights for active/enabled subclusters + for (Map.Entry r : headroom + .entrySet()) { + if (activeAndEnabledSC.contains(r.getKey())) { + totHeadroomMemory += r.getValue().getMemorySize(); + totHeadRoomEnabledRMs++; + } + } + + //pre-compute sum of "active" weights from this policy configuration + for (Map.Entry entry : weights + .entrySet()) { + if (activeAndEnabledSC.contains(entry.getKey())) { + totPolicyWeight += entry.getValue(); + } + } + + } + + /** + * Add to the answer a localized node request, and keeps track of + * statistics on a per-allocation-id and per-subcluster bases. + */ + private void addLocalizedNodeRR(SubClusterId targetId, + ResourceRequest rr) { + Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(targetId)) { + countContainersPerRM.get(rr.getAllocationRequestId()) + .put(targetId, new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) + .addAndGet(rr.getNumContainers()); + + totNumLocalizedContainers.addAndGet(rr.getNumContainers()); + + internalAddToAnswer(targetId, rr); + } + + /** + * Add a rack-local request to the final asnwer + */ + public void addRackRR(SubClusterId targetId, + ResourceRequest rr) { + Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + internalAddToAnswer(targetId, rr); + } + + /** + * Add an ANY request to the final answer + */ + private void addAnyRR(SubClusterId targetId, + ResourceRequest rr) { + Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName())); + internalAddToAnswer(targetId, rr); + } + + private void internalAddToAnswer(SubClusterId targetId, + ResourceRequest partialRR) { + if (!answer.containsKey(targetId)) { + answer.put(targetId, new ArrayList()); + } + answer.get(targetId).add(partialRR); + } + + /** + * Return all known sub-clusters associated with an allocation id. + * + * @param allocationId the allocation id considered + * + * @return the list of FederationSubclusterIds associated with this + * allocation id + */ + private Set getSubClustersForId(long allocationId) { + if (countContainersPerRM.get(allocationId) == null) { + return null; + } + return countContainersPerRM.get(allocationId).keySet(); + } + + /** + * Return the answer accumulated so far. + * + * @return the answer + */ + private Map> getAnswer() { + return answer; + } + + /** + * Return the set of sub-clusters that are both active and allowed by our + * policy (weight > 0) + * + * @return a set of active and enabled subclusters + */ + private Set getActiveAndEnabledSC() { + return activeAndEnabledSC; + } + + /** + * Return the total number of container coming from localized requests. + */ + private long getTotNumLocalizedContainers() { + return totNumLocalizedContainers.get(); + } + + /** + * Returns the number of containers matching an allocation Id that are + * localized in the targetId subcluster + */ + private long getNumLocalizedContainers(long allocationId, + SubClusterId targetId) { + AtomicLong c = countContainersPerRM.get(allocationId).get(targetId); + return c == null ? 0 : c.get(); + } + + /** + * Returns true is the subcluster request is both active and enabled + */ + private boolean isActiveAndEnabled(SubClusterId targetId) { + return getActiveAndEnabledSC().contains(targetId); + } + + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..1fd6f66 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Simple test class for the {@link BroadcastAMRMProxyFederationPolicy}. + */ +public class TestBroadcastAMRMProxyFederationPolicy { + + AMRMProxyFederationPolicy policy; + SubClusterId sc1; + SubClusterId sc2; + Map activeSubclusters; + + @Before + public void setUp() throws Exception { + policy = new BroadcastAMRMProxyFederationPolicy(); + activeSubclusters = new HashMap<>(); + sc1 = SubClusterId.newInstance("sc1"); + sc2 = SubClusterId.newInstance("sc2"); + activeSubclusters.put(sc1, mock(SubClusterInfo.class)); + activeSubclusters.put(sc2, mock(SubClusterInfo.class)); + } + + @Test + public void testReinitilialize() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration(SubClusterPolicyConfiguration + .newInstance("queue1",policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + policy.reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration( + SubClusterPolicyConfiguration.newInstance("queue1","WrongPolicyName", + buf)); + policy.reinitialize(fpc); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + //verify the request is broadcasted to all subclusters + + String[] hosts = new String[] { "host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + Assert.assertTrue(response.size() == 2); + for (Map.Entry> entry : response + .entrySet()) { + Assert.assertTrue(activeSubclusters.get(entry.getKey()) != null); + for (ResourceRequest r : entry.getValue()) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + for (SubClusterId subClusterId : activeSubclusters.keySet()) { + for (ResourceRequest r : response.get(subClusterId)) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + } + + @Test + public void testNotifyOfResponse() throws Exception { + + String[] hosts = new String[] { "host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + try { + policy.notifyOfResponse(SubClusterId.newInstance("sc3"), + mock(AllocateResponse.class)); + Assert.fail(); + } catch (FederationPolicyException f) { + System.out.println("Expected: " + f.getMessage()); + } + + policy.notifyOfResponse(SubClusterId.newInstance("sc1"), + mock(AllocateResponse.class)); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..30fe8d0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyFederationPolicy.java @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.FederationSubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.*; + +import static org.mockito.Mockito.mock; + +/** + * Simple test class for the {@link LocalityMulticastAMRMProxyFederationPolicy}. + */ +public class TestLocalityMulticastAMRMProxyFederationPolicy { + + AMRMProxyFederationPolicy policy; + WeightedFederationPolicyInfo policyInfo; + Random rand = new Random(); + SubClusterId sc1; + SubClusterId sc2; + Map activeSubclusters; + FederationPolicyContext federationPolicyContext; + + @Before + public void setUp() throws Exception { + policy = new LocalityMulticastAMRMProxyFederationPolicy(); + activeSubclusters = new HashMap<>(); + policyInfo = new WeightedFederationPolicyInfo(); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 6; i++) { + FederationSubClusterIdInfo sc = new FederationSubClusterIdInfo + ("subcluster" + i); + // sub-cluster 3 is not active + if (i != 3) { + activeSubclusters.put(sc.toId(), mock(SubClusterInfo.class)); + } + float weight = 1 / 10f; + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + // sub-cluster 4 is "disabled" in the weights + if (i == 4) { + routerWeights.put(sc, 0f); + amrmWeights.put(sc, 0f); + } + } + policyInfo.setRouterWeights(routerWeights); + policyInfo.setAmrmWeights(amrmWeights); + policyInfo.setHeadroomAlpha(0.5f); + } + + @Test + public void testReinitilialize() throws YarnException { + initializePolicy(); + } + + private void initializePolicy() + throws FederationPolicyInitializationException { + federationPolicyContext = new FederationPolicyContext(); + federationPolicyContext.setFederationSubclusterResolver( + FederationPoliciesTestUtil.initResolver()); + ByteBuffer buf = WeightedFederationPolicyInfo.toByteBuffer(policyInfo); + federationPolicyContext.setFederationPolicyConfiguration( + SubClusterPolicyConfiguration + .newInstance("queue1", policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(federationPolicyContext); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + policy.reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration( + SubClusterPolicyConfiguration.newInstance("queue1","WrongPolicyName", + buf)); + policy.reinitialize(fpc); + } + + @Test + public void testSplitBasedOnHeadroom() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + policyInfo.setHeadroomAlpha(1.0f); + + initializePolicy(); + List resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + Assert.assertEquals(75, + response.get(SubClusterId.newInstance("subcluster0")).get(0) + .getNumContainers()); + Assert.assertNull(response.get(SubClusterId.newInstance("subcluster1"))); + Assert.assertEquals(1, + response.get(SubClusterId.newInstance("subcluster2")).get(0) + .getNumContainers()); + Assert.assertEquals(25, + response.get(SubClusterId.newInstance("subcluster5")).get(0) + .getNumContainers()); + + } + + @Test + public void testStressPolicy() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + policyInfo.setHeadroomAlpha(1.0f); + + initializePolicy(); + List resourceRequests = createLargeRandomList(); + + prepPolicyWithHeadroom(); + + int numIterations = 1000; + long tstart = System.currentTimeMillis(); + for (int i = 0; i < numIterations; i++) { + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + // validateSplit(response, resourceRequests); + } + long tend = System.currentTimeMillis(); + + System.out.println( + "Performed " + numIterations + " invocations of the " + "policy in " + ( + tend - tstart) + "ms"); + } + + @Test + public void testFWDAllZeroANY() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + policyInfo.setHeadroomAlpha(0.5f); + + initializePolicy(); + List resourceRequests = createZeroSizedANYRequest(); + + // this receives responses from sc0,sc1,sc2 + prepPolicyWithHeadroom(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + //we expect all three to appear for a zero-sized ANY + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + Assert.assertEquals(0, + response.get(SubClusterId.newInstance("subcluster0")).get(0) + .getNumContainers()); + Assert.assertEquals(0, + response.get(SubClusterId.newInstance("subcluster1")).get(0) + .getNumContainers()); + Assert.assertEquals(0, + response.get(SubClusterId.newInstance("subcluster2")).get(0) + .getNumContainers()); + + Assert.assertNull(response.get(SubClusterId.newInstance("subcluster3"))); + Assert.assertNull(response.get(SubClusterId.newInstance("subcluster4"))); + Assert.assertNull(response.get(SubClusterId.newInstance("subcluster5"))); + } + + @Test + public void testSplitBasedOnHeadroomAndWeights() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + + // Configure policy to be 50% headroom based and 50% weight based + policyInfo.setHeadroomAlpha(0.5f); + + initializePolicy(); + List resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + Assert.assertEquals(50, + response.get(SubClusterId.newInstance("subcluster0")).get(0) + .getNumContainers()); + Assert.assertEquals(13, + response.get(SubClusterId.newInstance("subcluster1")).get(0) + .getNumContainers()); + Assert.assertEquals(13, + response.get(SubClusterId.newInstance("subcluster2")).get(0) + .getNumContainers()); + Assert.assertEquals(25, + response.get(SubClusterId.newInstance("subcluster5")).get(0) + .getNumContainers()); + + } + + private void prepPolicyWithHeadroom() throws YarnException { + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + policy.notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar); + + ar = getAllocateResponseWithTargetHeadroom(0); + policy.notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar); + + ar = getAllocateResponseWithTargetHeadroom(1); + policy.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); + } + + private AllocateResponse getAllocateResponseWithTargetHeadroom( + int numContainers) { + return AllocateResponse + .newInstance(0, null, null, Collections.emptyList(), + Resource.newInstance(numContainers * 1024, numContainers), null, 10, + null, Collections.emptyList()); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + // Test a complex List is split correctly + initializePolicy(); + List resourceRequests = createComplexRequest(); + + Map> response = + policy.splitResourceRequests(resourceRequests, activeSubclusters); + + validateSplit(response, resourceRequests); + prettyPrintRequests(response); + + // check that hard-code expected behaviors hold + List rrs = + response.get(SubClusterId.newInstance("subcluster0")); + Assert.assertEquals(7, rrs.size()); + + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 1L); + } + + rrs = response.get(SubClusterId.newInstance("subcluster1")); + Assert.assertEquals(5, rrs.size()); + + rrs = response.get(SubClusterId.newInstance("subcluster2")); + Assert.assertEquals(3, rrs.size()); + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 0L); + } + + rrs = response.get(SubClusterId.newInstance("subcluster3")); + Assert.assertNull(rrs); + + rrs = response.get(SubClusterId.newInstance("subcluster4")); + Assert.assertNull(rrs); + + rrs = response.get(SubClusterId.newInstance("subcluster5")); + Assert.assertEquals(1, rrs.size()); + + for (ResourceRequest rr : rrs) { + if (rr.getAllocationRequestId() == 2) { + //this is the fraction of containers that belong to this + // subcluster (100 total / 5 active-and-enabled sub-clusters) + Assert.assertEquals(25, rr.getNumContainers()); + } + Assert.assertTrue(rr.getAllocationRequestId() >= 2); + Assert.assertTrue(rr.getRelaxLocality() == true); + } + } + + private void validateSplit( + Map> split, + List original) throws YarnException { + + SubClusterResolver resolver = + federationPolicyContext.getFederationSubclusterResolver(); + + // Apply general validation rules + int numUsedSubclusters = split.size(); + + Set originalIds = new HashSet<>(); + Set splitIds = new HashSet<>(); + + int originalContainers = 0; + for (ResourceRequest rr : original) { + originalContainers += rr.getNumContainers(); + originalIds.add(rr.getAllocationRequestId()); + } + + int splitContainers = 0; + for (Map.Entry> rrs : split + .entrySet()) { + for (ResourceRequest rr : rrs.getValue()) { + splitContainers += rr.getNumContainers(); + splitIds.add(rr.getAllocationRequestId()); + // check node-local asks are sent to right RM (only) + SubClusterId fid = null; + try { + fid = resolver.getSubClusterForNode(rr.getResourceName()); + } catch (YarnException e){ + //ignore code will handle + } + if (fid != null && !fid.equals(rrs.getKey())) { + Assert.fail("A node-local (or resolvable rack-local) RR should not " + + "be send to an RM other than what it resolves to."); + } + } + } + + // check we are not inventing Allocation Ids + Assert.assertEquals(originalIds, splitIds); + + // check we are not exceedingly replicating the container asks among + // RMs (a little is allowed due to rounding of fractional splits) + Assert.assertTrue(" Containers requested (" + splitContainers + ") " + + "should not exceed the original count of containers (" + + originalContainers + ") by more than the number of subclusters (" + + numUsedSubclusters + ")", + originalContainers + numUsedSubclusters >= splitContainers); + + // Test target Ids + for (SubClusterId targetId : split.keySet()) { + Assert.assertTrue("Target subclusters should be in the " + "active set", + activeSubclusters.containsKey(targetId)); + Assert.assertTrue( + "Target subclusters should have weight >0 in the " + "policy", + policyInfo.getRouterWeights().get(targetId) > 0); + } + } + + private void prettyPrintRequests( + Map> response) { + for (Map.Entry> entry : response + .entrySet()) { + String str = ""; + for (ResourceRequest rr : entry.getValue()) { + str += " [id:" + rr.getAllocationRequestId() + " loc:" + rr + .getResourceName() + " numCont:" + rr.getNumContainers() + "], "; + } + System.out.println(entry.getKey() + " --> " + str); + } + } + + private List createLargeRandomList() throws Exception { + + List out = new ArrayList<>(); + Random rand = new Random(1); + DefaultSubClusterResolverImpl resolver = + (DefaultSubClusterResolverImpl) federationPolicyContext + .getFederationSubclusterResolver(); + + List nodes = + new ArrayList<>(resolver.getNodeToSubCluster().keySet()); + + int numRR = 1000; + + for (int i = 0; i < numRR; i++) { + String nodeName = nodes.get(rand.nextInt(nodes.size())); + long allocationId = (long) rand.nextInt(20); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(allocationId, nodeName, 1024, 1, 1, + rand.nextInt(100), null, rand.nextBoolean())); + } return out; + } + + private List createSimpleRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, ResourceRequest.ANY, 1024, 1, 1, 100, null, + true)); + return out; + } + + private List createZeroSizedANYRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, ResourceRequest.ANY, 1024, 1, 1, 0, null, + true)); + return out; + } + + private List createComplexRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, "subcluster0-rack0-host0", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, "subcluster0-rack0", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(0L, ResourceRequest.ANY, 1024, 1, 1, 1, null, + false)); + + // create a single container request with 3 alternative hosts across sc1,sc2 + // where we want 2 containers in sc1 and 1 in sc2 + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "subcluster1-rack1-host1", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "subcluster1-rack1-host2", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "subcluster2-rack3-host3", 1024, 1, 1, 1, + null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "subcluster1-rack1", 1024, 1, 1, 2, null, false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, "subcluster2-rack3", 1024, 1, 1, 1, null, + false)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(1L, ResourceRequest.ANY, 1024, 1, 1, 2, null, + false)); + + // create a non-local ANY request that can span anything + out.add(FederationPoliciesTestUtil + .createResourceRequest(2L, ResourceRequest.ANY, 1024, 1, 1, 100, null, + true)); + + // create a single container request in sc0 with relaxed locality + out.add(FederationPoliciesTestUtil + .createResourceRequest(3L, "subcluster0-rack0-host0", 1024, 1, 1, 1, null, + true)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(3L, "subcluster0-rack0", 1024, 1, 1, 1, null, true)); + out.add(FederationPoliciesTestUtil + .createResourceRequest(3L, ResourceRequest.ANY, 1024, 1, 1, 1, null, + true)); + + return out; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java index 8238633..d72211a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java @@ -57,7 +57,7 @@ public SubClusterId getSubClusterForNode(String nodename) return rackToSubClusters.get(rackname); } - protected Map getNodeToSubCluster() { + public Map getNodeToSubCluster() { return nodeToSubCluster; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes index e4d6112..7afda0a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes @@ -1,4 +1,10 @@ node1,subcluster1,rack1 - node2 , subcluster2, RACK1 -noDE3,subcluster3, rack2 -node4, subcluster3, rack2 \ No newline at end of file +node2 , subcluster2, RACK1 +noDE3 ,subcluster3, rack2 +node4 , subcluster3, rack2 +subcluster0-rack0-host0,subcluster0, rack0 +Subcluster1-RACK1-HOST1,subcluster1, RACK1 +SUBCLUSTER1-RACK1-HOST2,subcluster1, RACK1 +SubCluster2-RACK3-HOST3,subcluster2, RACK3 +subcluster0-rack0,subcluster0, sc0-rack0 +subCluster1-RACK1,subcluster1, SC1-RACK1