diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/OrderedRouterFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/OrderedRouterFederationPolicy.java new file mode 100644 index 0000000..10bb02a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/OrderedRouterFederationPolicy.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; + +import java.util.Map; + +/** + * This implements a policy that interprets "weights" as a ordered list of + * preferences among sub-clusters. + */ +public class OrderedRouterFederationPolicy + extends WeightedRouterFederationPolicy { + + private static final Log LOG = + LogFactory.getLog(OrderedRouterFederationPolicy.class); + WeightedFederationPolicyInfo policyInfo = null; + + @Override + public FederationSubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext, + Map activeSubclusters) + throws YarnException { + + if (activeSubclusters == null || activeSubclusters.size() < 1) { + throw new NoActiveSubclustersException( + "Zero active subclusters, cannot pick where to send job."); + } + + // This finds the sub-cluster with the lowest weight among the + // currently active ones. + Map weights = policyInfo.getRouterWeights(); + FederationSubClusterId chosen = null; + Float currentBest = Float.MAX_VALUE; + for (FederationSubClusterId id : activeSubclusters.keySet()) { + if (weights.containsKey(id) && weights.get(id) < currentBest) { + chosen = id; + } + } + + return chosen; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/ProbabilisticRouterFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/ProbabilisticRouterFederationPolicy.java new file mode 100644 index 0000000..6826460 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/ProbabilisticRouterFederationPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; + +import java.util.Map; + +/** + * This policy implements a weighted random sample among currently active + * sub-clusters. + */ +public class ProbabilisticRouterFederationPolicy + extends WeightedRouterFederationPolicy { + + private static final Log LOG = + LogFactory.getLog(ProbabilisticRouterFederationPolicy.class); + + @Override + public FederationSubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext, + Map activeSubclusters) + throws YarnException { + + if (activeSubclusters == null || activeSubclusters.size() < 1) { + throw new NoActiveSubclustersException( + "Zero active subclusters, cannot pick where to send job."); + } + if (policyInfo == null) { + throw new FederationPolicyInitializationException( + "This policy has not " + "been initalized correctly."); + } + + // note: we cannot pre-compute the weights, as the set of activeSubcluster + // changes dynamically (and this would unfairly spread the load to + // sub-clusters adjacent to an inactive one), hence we need to count/scan + // the list and based on weight pick the next sub-cluster. + Map weights = policyInfo.getRouterWeights(); + + float totActiveWeight = 0; + for (FederationSubClusterId id : activeSubclusters.keySet()) { + totActiveWeight += weights.get(id); + } + float lookupValue = rand.nextFloat() * totActiveWeight; + + FederationSubClusterId chosen = null; + for (FederationSubClusterId id : activeSubclusters.keySet()) { + if (weights.containsKey(id)) { + lookupValue -= weights.get(id); + } + chosen = id; + if (lookupValue <= 0) { + break; + } + } + + return chosen; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterFederationPolicy.java new file mode 100644 index 0000000..1da55f2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterFederationPolicy.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * This simple policy picks at uniform random among any of the currently active + * subclusters. + */ +public class UniformRandomRouterFederationPolicy + implements RouterFederationPolicy { + + Random rand; + + public UniformRandomRouterFederationPolicy() { + rand = new Random(System.currentTimeMillis()); + } + + //useful to make tests deterministic + public UniformRandomRouterFederationPolicy(long seed) { + rand = new Random(seed); + } + + @Override + public void reinitialize(FederationPolicyContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + //nothing else to initialize, behavior does not depend + } + + /** + * Simply picks a random active subcluster to start the AM. + * + * @param appSubmissionContext the context for the app being submitted + * (ignored). + * @param activeSubclusters the list of subclusters currently active. + * + * @return a randomly chosen subcluster. + * + * @throws YarnException if there are no active subclusters. + */ + public FederationSubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext, + Map activeSubclusters) + throws YarnException { + if (activeSubclusters == null || activeSubclusters.size() < 1) { + throw new NoActiveSubclustersException( + "Zero active subclusters, cannot pick where to send job."); + } + List list = + new ArrayList<>(activeSubclusters.keySet()); + return list.get(rand.nextInt(list.size())); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRouterFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRouterFederationPolicy.java new file mode 100644 index 0000000..53756f1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRouterFederationPolicy.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; + +import java.util.Map; +import java.util.Random; + +/** + * Abstract class providing common validation of a reinitialized, for all + * policies that are "weight-based". + */ +public abstract class WeightedRouterFederationPolicy + implements RouterFederationPolicy { + + WeightedFederationPolicyInfo policyInfo = null; + Random rand = new Random(System.currentTimeMillis()); + + public WeightedRouterFederationPolicy() { + } + + @Override + public void reinitialize(FederationPolicyContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + // perform consistency checks + WeightedFederationPolicyInfo newPolicyInfo = WeightedFederationPolicyInfo + .fromByteBuffer( + federationPolicyContext.getFederationPolicyConfiguration() + .getParams()); + + // if nothing has changed skip the rest of initialization + if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { + return; + } + + Map newWeights = + newPolicyInfo.getRouterWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + + policyInfo = newPolicyInfo; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterFederationPolicy.java new file mode 100644 index 0000000..0a84eba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterFederationPolicy.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0 + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.policies.router.RouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterFederationPolicy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Simple test class for the {@link UniformRandomRouterFederationPolicy}. + */ +public class TestUniformRandomRouterFederationPolicy { + List subClusters; + FederationSubClusterId sc1; + FederationSubClusterId sc2; + RouterFederationPolicy policy; + ApplicationSubmissionContext applicationSubmissionContext; + Map activeSubclusters; + + @Before + public void setUp() throws Exception { + policy = new UniformRandomRouterFederationPolicy(); + applicationSubmissionContext = mock(ApplicationSubmissionContext.class); + activeSubclusters = new HashMap<>(); + sc1 = FederationSubClusterId.newInstance("sc1"); + sc2 = FederationSubClusterId.newInstance("sc2"); + activeSubclusters.put(sc1, mock(FederationSubClusterInfo.class)); + activeSubclusters.put(sc2, mock(FederationSubClusterInfo.class)); + } + + @Test + public void testReinitilialize() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration(FederationPolicyConfiguration + .newInstance(policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + policy.reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration( + FederationPolicyConfiguration.newInstance("WrongPolicyName", buf)); + policy.reinitialize(fpc); + } + + @Test + public void testOneSubclusterIsChosen() throws YarnException { + FederationSubClusterId chosen = policy + .getHomeSubcluster(applicationSubmissionContext, activeSubclusters); + Assert.assertTrue(chosen.equals(sc1) || chosen.equals(sc2)); + } + + @Test(expected = NoActiveSubclustersException.class) + public void testNoSubclusters() throws YarnException { + //empty the activeSubclusters map + Map activeSubclusters = + new HashMap<>(); + policy.getHomeSubcluster(applicationSubmissionContext, activeSubclusters); + } + + @Test(expected = NoActiveSubclustersException.class) + public void testNullInputs() throws YarnException { + // use null for activeSubclusters map + policy.getHomeSubcluster(applicationSubmissionContext, null); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterFederationPolicy.java new file mode 100644 index 0000000..a0cbefb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterFederationPolicy.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0 + *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.api.records.FederationPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterId; +import org.apache.hadoop.yarn.server.federation.api.records.FederationSubClusterInfo; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.FederationSubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; + +/** + * Simple test class for the {@link ProbabilisticRouterFederationPolicy}. + * Generate large number of randomized tests to check we are weighiting + * correctly even if clusters go inactive. + */ +public class TestWeightedRandomRouterFederationPolicy { + List subClusters; + FederationSubClusterId sc1; + FederationSubClusterId sc2; + RouterFederationPolicy policy; + ApplicationSubmissionContext applicationSubmissionContext; + Map activeSubclusters; + WeightedFederationPolicyInfo policyInfo; + Random rand = new Random(); + + @Before + public void setUp() throws Exception { + policy = new ProbabilisticRouterFederationPolicy(); + applicationSubmissionContext = mock(ApplicationSubmissionContext.class); + activeSubclusters = new HashMap<>(); + policyInfo = new WeightedFederationPolicyInfo(); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 20; i++) { + FederationSubClusterIdInfo sc = new FederationSubClusterIdInfo("sc" + i); + // with 5% omit a subcluster + if (rand.nextFloat() < 0.95f) { + activeSubclusters.put(sc.toId(), mock(FederationSubClusterInfo.class)); + } + float weight = rand.nextFloat(); + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + } + policyInfo.setRouterWeights(routerWeights); + policyInfo.setAmrmWeights(amrmWeights); + + } + + @Test + public void testReinitilialize() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = WeightedFederationPolicyInfo.toByteBuffer(policyInfo); + fpc.setFederationPolicyConfiguration(FederationPolicyConfiguration + .newInstance(policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + policy.reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + policy.reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setFederationPolicyConfiguration( + FederationPolicyConfiguration.newInstance("WrongPolicyName", buf)); + policy.reinitialize(fpc); + } + + @Test + public void testClusterChosenWithRightProbability() throws YarnException { + + FederationPolicyContext fpc = new FederationPolicyContext(); + ByteBuffer buf = WeightedFederationPolicyInfo.toByteBuffer(policyInfo); + fpc.setFederationPolicyConfiguration(FederationPolicyConfiguration + .newInstance(policy.getClass().getCanonicalName(), buf)); + policy.reinitialize(fpc); + + FederationSubClusterId chosen = policy + .getHomeSubcluster(applicationSubmissionContext, activeSubclusters); + + Map counter = new HashMap<>(); + for (FederationSubClusterId id : policyInfo.getRouterWeights().keySet()) { + counter.put(id, new AtomicLong(0)); + } + + float numberOfDraws = 1000000; + + for (float i = 0; i < numberOfDraws; i++) { + FederationSubClusterId chosenId = policy + .getHomeSubcluster(applicationSubmissionContext, activeSubclusters); + long count = counter.get(chosenId).incrementAndGet(); + } + + float totalActiveWeight = 0; + for (FederationSubClusterId id : activeSubclusters.keySet()) { + totalActiveWeight += policyInfo.getRouterWeights().get(id); + } + + for (Map.Entry counterEntry : counter + .entrySet()) { + float expectedWeight = + policyInfo.getRouterWeights().get(counterEntry.getKey()) + / totalActiveWeight; + float actualWeight = counterEntry.getValue().get() / numberOfDraws; + + // make sure that the weights is respected among active subclusters + // and no jobs are routed to inactive subclusters. + if (activeSubclusters.containsKey(counterEntry.getKey())) { + Assert.assertTrue( + "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight + + " expected weight: " + expectedWeight, + (actualWeight / expectedWeight) < 1.1 + && (actualWeight / expectedWeight) > 0.9); + } else { + Assert.assertTrue( + "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight + + " expected weight: " + expectedWeight, actualWeight == 0); + + } + } + } + + @Test(expected = NoActiveSubclustersException.class) + public void testNoSubclusters() throws YarnException { + //empty the activeSubclusters map + Map activeSubclusters = + new HashMap<>(); + policy.getHomeSubcluster(applicationSubmissionContext, activeSubclusters); + } + + @Test(expected = NoActiveSubclustersException.class) + public void testNullInputs() throws YarnException { + // use null for activeSubclusters map + policy.getHomeSubcluster(applicationSubmissionContext, null); + } + +}