From 0327a2b52a15a6bb389e9ea0febc47def6467ed5 Mon Sep 17 00:00:00 2001 From: youchen Date: Tue, 19 Nov 2019 17:45:54 -0800 Subject: [PATCH] Patched from OSS, updated from YPP --- .../policies/FederationPolicyUtils.java | 11 +- .../WeightedLocalityPolicyManager.java | 11 +- .../policies/router/LocalityRouterPolicy.java | 196 ++++++++++++ .../policies/BaseFederationPoliciesTest.java | 4 +- .../TestWeightedLocalityPolicyManager.java | 4 +- .../router/TestLocalityRouterPolicy.java | 282 ++++++++++++++++++ .../TestWeightedRandomRouterPolicy.java | 18 +- 7 files changed, 510 insertions(+), 16 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java index aaa2c43c6ae..3aeeca3c738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java @@ -37,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Utility class for Federation policy. */ @@ -48,7 +50,7 @@ public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE = "No active SubCluster available to submit the request."; - private static final Random RAND = new Random(System.currentTimeMillis()); + private static Random rand = new Random(System.currentTimeMillis()); /** Disable constructor. */ private FederationPolicyUtils() { @@ -223,7 +225,7 @@ public static int getWeightedRandom(ArrayList weights) { if (totalWeight == 0) { return -1; } - float samplePoint = RAND.nextFloat() * totalWeight; + float samplePoint = rand.nextFloat() * totalWeight; int lastIndex = 0; for (i = 0; i < weights.size(); i++) { if (weights.get(i) > 0) { @@ -239,4 +241,9 @@ public static int getWeightedRandom(ArrayList weights) { // float rounding kicks in during subtractions return lastIndex; } + + @VisibleForTesting + public static void setRand(long seed){ + rand.setSeed(seed); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java index 109b53437ca..a14450117b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java @@ -17,18 +17,19 @@ package org.apache.hadoop.yarn.server.federation.policies.manager; -import com.google.common.annotations.VisibleForTesting; +import java.nio.ByteBuffer; + import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.LocalityRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; -import java.nio.ByteBuffer; +import com.google.common.annotations.VisibleForTesting; /** * Policy that allows operator to configure "weights" for routing. This picks a - * {@link WeightedRandomRouterPolicy} for the router and a {@link + * {@link LocalityRouterPolicy} for the router and a {@link * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to * work together. */ @@ -40,7 +41,7 @@ public WeightedLocalityPolicyManager() { //this structurally hard-codes two compatible policies for Router and // AMRMProxy. - routerFederationPolicy = WeightedRandomRouterPolicy.class; + routerFederationPolicy = LocalityRouterPolicy.class; amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class; weightedPolicyInfo = new WeightedPolicyInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java new file mode 100644 index 00000000000..469240af518 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Collections; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This policy selects the subcluster depending on the node where the Client + * wants to run its application. + * + * It succeeds if: + * + * - There are three AMContainerResourceRequests in the order + * NODE, RACK, ANY + * + * It falls back to WeightedRandomRouterPolicy in case of: + * + * - Null or empty AMContainerResourceRequests; + * + * - One AMContainerResourceRequests and it has ANY as ResourceName; + * + * - The node is in blacklisted SubClusters. + * + * It fails if: + * + * - The node does not exist and RelaxLocality is False; + * + * - We have an invalid number (not 0, 1 or 3) resource requests + */ +public class LocalityRouterPolicy extends WeightedRandomRouterPolicy { + + public static final Logger LOG = + LoggerFactory.getLogger(LocalityRouterPolicy.class); + + private SubClusterResolver resolver; + private List enabledSCs; + + @Override + public void reinitialize(FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + super.reinitialize(policyContext); + resolver = policyContext.getFederationSubclusterResolver(); + Map weights = + getPolicyInfo().getRouterPolicyWeights(); + enabledSCs = new ArrayList(); + for (Map.Entry entry : weights.entrySet()) { + if (entry != null && entry.getValue() > 0) { + enabledSCs.add(entry.getKey().toId()); + } + } + } + + @Override + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { + + // null checks and default-queue behavior + validate(appSubmissionContext); + + List rrList = + appSubmissionContext.getAMContainerResourceRequests(); + + // Fast path for FailForward to WeightedRandomRouterPolicy + if (rrList == null || rrList.isEmpty() || (rrList.size() == 1 + && ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) { + return super + .getHomeSubcluster(appSubmissionContext, blackListSubClusters); + } + + if (rrList.size() != 3) { + throw new FederationPolicyException( + "Invalid number of resource requests: " + rrList.size()); + } + + Map activeSubClusters = + getActiveSubclusters(); + List validSubClusters = + new ArrayList<>(activeSubClusters.keySet()); + FederationPolicyUtils + .validateSubClusterAvailability(validSubClusters, blackListSubClusters); + if (blackListSubClusters != null) { + // Remove from the active SubClusters from StateStore the blacklisted ones + validSubClusters.removeAll(blackListSubClusters); + } + + try { + // With three requests, this has been processed by the + // ResourceRequestInterceptorREST, and should have + // node, rack, and any + SubClusterId targetId = null; + ResourceRequest nodeRequest = null; + ResourceRequest rackRequest = null; + ResourceRequest anyRequest = null; + for (ResourceRequest rr : rrList) { + // Handle "node" requests + try { + targetId = resolver.getSubClusterForNode(rr.getResourceName()); + nodeRequest = rr; + } catch (YarnException e) { + LOG.error("Cannot resolve node : {}", e.getLocalizedMessage()); + } + // Handle "rack" requests + try { + resolver.getSubClustersForRack(rr.getResourceName()); + rackRequest = rr; + } catch (YarnException e) { + LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage()); + } + // Handle "ANY" requests + if (ResourceRequest.isAnyLocation(rr.getResourceName())) { + anyRequest = rr; + continue; + } + } + if (nodeRequest == null) { + throw new YarnException("Missing node request"); + } + if (rackRequest == null) { + throw new YarnException("Missing rack request"); + } + if (anyRequest == null) { + throw new YarnException("Missing any request"); + } + LOG.info( + "Node request: " + nodeRequest.getResourceName() + ", Rack request: " + + rackRequest.getResourceName() + ", Any request: " + anyRequest + .getResourceName()); + // Handle "node" requests + if (validSubClusters.contains(targetId) && enabledSCs + .contains(targetId)) { + LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(), + targetId); + return targetId; + } else { + throw new YarnException("The node " + nodeRequest.getResourceName() + + " is in a blacklist SubCluster or not active. "); + } + } catch (YarnException e) { + LOG.error("Validating resource requests failed, Falling back to " + + "WeightedRandomRouterPolicy placement: " + e.getMessage()); + // FailForward to WeightedRandomRouterPolicy + // Overwrite request to use a default ANY + ResourceRequest amReq = Records.newRecord(ResourceRequest.class); + amReq.setPriority(appSubmissionContext.getPriority()); + amReq.setResourceName(ResourceRequest.ANY); + amReq.setCapability(appSubmissionContext.getResource()); + amReq.setNumContainers(1); + amReq.setRelaxLocality(true); + amReq.setNodeLabelExpression( + appSubmissionContext.getNodeLabelExpression()); + amReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)); + appSubmissionContext + .setAMContainerResourceRequests(Collections.singletonList(amReq)); + return super + .getHomeSubcluster(appSubmissionContext, blackListSubClusters); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 57d3c67ec0a..249efd324b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -162,8 +162,8 @@ public Random getRand() { return rand; } - public void setRand(Random rand) { - this.rand = rand; + public void setRand(long seed) { + this.rand.setSeed(seed); } public SubClusterId getHomeSubCluster() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java index 51661473000..d2ef7a1864b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java @@ -19,7 +19,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.LocalityRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.junit.Assert; @@ -63,7 +63,7 @@ public void setup() { //set expected params that the base test class will use for tests expectedPolicyManager = WeightedLocalityPolicyManager.class; expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class; - expectedRouterPolicy = WeightedRandomRouterPolicy.class; + expectedRouterPolicy = LocalityRouterPolicy.class; } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java new file mode 100644 index 00000000000..3288432722c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class to validate the correctness of LocalityRouterPolicy. + */ +public class TestLocalityRouterPolicy extends TestWeightedRandomRouterPolicy { + + /* + * The MachineList for the default Resolver has the following nodes: + * + * node1<=>subcluster1 + * + * node2<=>subcluster2 + * + * noDE3<=>subcluster3 + * + * node4<=>subcluster3 + * + * subcluster0-rack0-host0<=>subcluster0 + * + * Subcluster1-RACK1-HOST1<=>subcluster1 + * + * SUBCLUSTER1-RACK1-HOST2<=>subcluster1 + * + * SubCluster2-RACK3-HOST3<=>subcluster2 + */ + + @Before + public void setUp() throws Exception { + setPolicy(new LocalityRouterPolicy()); + setPolicyInfo(new WeightedPolicyInfo()); + + configureWeights(4); + + initializePolicy(new YarnConfiguration()); + } + + private void initializePolicy(Configuration conf) throws YarnException { + setFederationPolicyContext(new FederationPolicyInitializationContext()); + SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver(); + getFederationPolicyContext().setFederationSubclusterResolver(resolver); + ByteBuffer buf = getPolicyInfo().toByteBuffer(); + getFederationPolicyContext().setSubClusterPolicyConfiguration( + SubClusterPolicyConfiguration + .newInstance("queue1", getPolicy().getClass().getCanonicalName(), + buf)); + getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster()); + FederationPoliciesTestUtil + .initializePolicyContext(getFederationPolicyContext(), getPolicy(), + getPolicyInfo(), getActiveSubclusters(), conf); + } + + /** + * This test validates the correctness in case of the request has 1 node and + * the node belongs to an active subcluster. + */ + @Test + public void testNodeInActiveSubCluster() throws YarnException { + List requests = new ArrayList(); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1), + 1)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), + 1)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, + Resource.newInstance(10, 1), 1)); + ApplicationSubmissionContext asc = ApplicationSubmissionContext + .newInstance(null, null, null, null, null, false, false, 0, + Resources.none(), null, false, null, null); + asc.setAMContainerResourceRequests(requests); + + SubClusterId chosen = + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); + // If node1 is active, we should choose the sub cluster with node1 + if (getActiveSubclusters().containsKey( + getFederationPolicyContext().getFederationSubclusterResolver() + .getSubClusterForNode("node1").getId())) { + Assert.assertEquals( + getFederationPolicyContext().getFederationSubclusterResolver() + .getSubClusterForNode("node1"), chosen); + } + // Regardless, we should choose an active SubCluster + Assert.assertTrue(getActiveSubclusters().containsKey(chosen)); + } + + /** + * This test validates the correctness in case of the request has multiple + * ResourceRequests. The tests without ResourceRequests are done in + * TestWeightedRandomRouterPolicy. + */ + @Test + public void testMultipleResourceRequests() throws YarnException { + List requests = new ArrayList(); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1), + 1)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "node2", Resource.newInstance(10, 1), + 1)); + ApplicationSubmissionContext asc = ApplicationSubmissionContext + .newInstance(null, null, null, null, null, false, false, 0, + Resources.none(), null, false, null, null); + asc.setAMContainerResourceRequests(requests); + try { + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); + Assert.fail(); + } catch (FederationPolicyException e) { + Assert.assertTrue( + e.getMessage().startsWith("Invalid number of resource requests: ")); + } + } + + /** + * This test validates the correctness in case of the request has 1 node and + * the node does not exist in the Resolver MachineList file. + */ + @Test + public void testNodeNotExists() throws YarnException { + List requests = new ArrayList(); + boolean relaxLocality = true; + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "node5", Resource.newInstance(10, 1), + 1, relaxLocality)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), + 1)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, + Resource.newInstance(10, 1), 1)); + ApplicationSubmissionContext asc = ApplicationSubmissionContext + .newInstance(null, null, null, null, null, false, false, 0, + Resources.none(), null, false, null, null); + asc.setAMContainerResourceRequests(requests); + + try { + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); + } catch (FederationPolicyException e) { + Assert.fail(); + } + } + + /** + * This test validates the correctness in case of the request has 1 node and + * the node is in a blacklist subclusters. + */ + @Test + public void testNodeInABlacklistSubCluster() throws YarnException { + // Blacklist SubCluster3 + String subClusterToBlacklist = "subcluster3"; + // Remember the current value of subcluster3 + Float value = + getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist); + getPolicyInfo().getRouterPolicyWeights() + .put(new SubClusterIdInfo(subClusterToBlacklist), 0.0f); + initializePolicy(new YarnConfiguration()); + + FederationPoliciesTestUtil + .initializePolicyContext(getFederationPolicyContext(), getPolicy(), + getPolicyInfo(), getActiveSubclusters(), new Configuration()); + + List requests = new ArrayList(); + boolean relaxLocality = true; + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1), + 1, relaxLocality)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), + 1)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, + Resource.newInstance(10, 1), 1)); + ApplicationSubmissionContext asc = ApplicationSubmissionContext + .newInstance(null, null, null, null, null, false, false, 0, + Resources.none(), null, false, null, null); + asc.setAMContainerResourceRequests(requests); + + try { + SubClusterId targetId = + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); + // The selected subcluster HAS no to be the same as the one blacklisted. + Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist); + } catch (FederationPolicyException e) { + Assert.fail(); + } + + // Set again the previous value for the other tests + getPolicyInfo().getRouterPolicyWeights() + .put(new SubClusterIdInfo(subClusterToBlacklist), value); + } + + /** + * This test validates the correctness in case of the request has 1 node and + * the node is not in the policy weights + */ + @Test + public void testNodeNotInPolicy() throws YarnException { + // Blacklist SubCluster3 + String subClusterToBlacklist = "subcluster3"; + // Remember the current value of subcluster3 + Float value = + getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist); + getPolicyInfo().getRouterPolicyWeights().remove(subClusterToBlacklist); + initializePolicy(new YarnConfiguration()); + + FederationPoliciesTestUtil + .initializePolicyContext(getFederationPolicyContext(), getPolicy(), + getPolicyInfo(), getActiveSubclusters(), new Configuration()); + + List requests = new ArrayList(); + boolean relaxLocality = true; + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1), + 1, relaxLocality)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), + 1)); + requests.add(ResourceRequest + .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, + Resource.newInstance(10, 1), 1)); + ApplicationSubmissionContext asc = ApplicationSubmissionContext + .newInstance(null, null, null, null, null, false, false, 0, + Resources.none(), null, false, null, null); + asc.setAMContainerResourceRequests(requests); + + try { + SubClusterId targetId = + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); + // The selected subcluster HAS no to be the same as the one blacklisted. + Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist); + } catch (FederationPolicyException e) { + Assert.fail(); + } + + // Set again the previous value for the other tests + getPolicyInfo().getRouterPolicyWeights() + .put(new SubClusterIdInfo(subClusterToBlacklist), value); + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index c969a30e65f..d549250f072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -47,10 +48,21 @@ public void setUp() throws Exception { setPolicy(new WeightedRandomRouterPolicy()); setPolicyInfo(new WeightedPolicyInfo()); + + configureWeights(20); + + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), getActiveSubclusters()); + } + + public void configureWeights(float numSubClusters) { + // Set random seed to remove random failures + FederationPolicyUtils.setRand(5); + setRand(5); + Map routerWeights = new HashMap<>(); Map amrmWeights = new HashMap<>(); - float numSubClusters = 20; // simulate N subclusters each with a 5% chance of being inactive for (int i = 0; i < numSubClusters; i++) { SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); @@ -74,10 +86,6 @@ public void setUp() throws Exception { } getPolicyInfo().setRouterPolicyWeights(routerWeights); getPolicyInfo().setAMRMPolicyWeights(amrmWeights); - - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), getActiveSubclusters()); - } @Test -- 2.24.0.windows.2