diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPolicy.java new file mode 100644 index 0000000..778a4ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPolicy.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.AMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.RouterFederationPolicy; + +/** + * This class provides basic implementation for common methods of {@link + * FederationPolicy} + */ +public abstract class BaseFederationPolicy implements FederationPolicy { + + String queue; + String routerFederationPolicy; + String amrmProxyFederationPolicy; + + /** + * This default implementation validates the {@link FederationPolicyContext}, + * then checks whether it needs to reinstantiate the class (null or + * mismatching type), and reinitialize the policy. + * + * @param federationPolicyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return a valid and fully reinitalized {@link AMRMProxyFederationPolicy} + * instance + * + * @throws FederationPolicyInitializationException if the reinitalization is + * not valid, and ensure + * previous state is preserved + */ + public AMRMProxyFederationPolicy getAMRMPolicy( + FederationPolicyContext federationPolicyContext, + AMRMProxyFederationPolicy oldInstance) + throws FederationPolicyInitializationException { + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + if (amrmProxyFederationPolicy == null) { + throw new FederationPolicyInitializationException("The parameter " + + "amrmProxyFederationPolicy should be initialized in " + this + .getClass().getSimpleName() + " constructor."); + } + + if (oldInstance == null || !oldInstance.getClass() + .equals(amrmProxyFederationPolicy)) { + try { + Class c = Class.forName(amrmProxyFederationPolicy); + oldInstance = (AMRMProxyFederationPolicy) c.newInstance(); + } catch (ClassNotFoundException e) { + throw new FederationPolicyInitializationException(e); + } catch (InstantiationException e) { + throw new FederationPolicyInitializationException(e); + } catch (IllegalAccessException e) { + throw new FederationPolicyInitializationException(e); + } + } + federationPolicyContext.getFederationPolicyConfiguration() + .setType(oldInstance.getClass().getCanonicalName()); + oldInstance.reinitialize(federationPolicyContext); + return oldInstance; + } + + /** + * This default implementation validates the {@link FederationPolicyContext}, + * then checks whether it needs to reinstantiate the class (null or + * mismatching type), and reinitialize the policy. + * + * @param federationPolicyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return a valid and fully reinitalized {@link RouterFederationPolicy} + * instance + * + * @throws FederationPolicyInitializationException if the reinitalization is + * not valid, and ensure + * previous state is preserved + */ + + public RouterFederationPolicy getRouterPolicy( + FederationPolicyContext federationPolicyContext, + RouterFederationPolicy oldInstance) + throws FederationPolicyInitializationException { + + if (routerFederationPolicy == null) { + throw new FederationPolicyInitializationException("The parameter " + + "routerFederationPolicy should be initialized in " + this + .getClass().getSimpleName() + " constructor."); + } + + FederationPolicyContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + if (oldInstance == null || !oldInstance.getClass() + .equals(routerFederationPolicy)) { + try { + Class c = Class.forName(routerFederationPolicy); + oldInstance = (RouterFederationPolicy) c.newInstance(); + } catch (ClassNotFoundException e) { + throw new FederationPolicyInitializationException(e); + } catch (InstantiationException e) { + throw new FederationPolicyInitializationException(e); + } catch (IllegalAccessException e) { + throw new FederationPolicyInitializationException(e); + } + } + federationPolicyContext.getFederationPolicyConfiguration() + .setType(oldInstance.getClass().getCanonicalName()); + oldInstance.reinitialize(federationPolicyContext); + return oldInstance; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ProbabilisticMulticastFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ProbabilisticMulticastFederationPolicy.java new file mode 100644 index 0000000..b5d7902 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ProbabilisticMulticastFederationPolicy.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import java.nio.ByteBuffer; + +/** + * Policy that allows operator to configure "weights" for routing. This picks a + * {@link WeightedRandomRouterFederationPolicy} for the router and a {@link + * LocalityMulticastAMRMProxyFederationPolicy} for the amrmproxy as they are + * designed to work together. + */ +public class ProbabilisticMulticastFederationPolicy + extends BaseFederationPolicy { + + WeightedFederationPolicyInfo weightedFederationPolicyInfo; + + public ProbabilisticMulticastFederationPolicy(String queue) { + //this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + this.queue = queue; + routerFederationPolicy = + WeightedRandomRouterFederationPolicy.class.getCanonicalName(); + amrmProxyFederationPolicy = + LocalityMulticastAMRMProxyFederationPolicy.class.getCanonicalName(); + weightedFederationPolicyInfo = new WeightedFederationPolicyInfo(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = null; + buf = + WeightedFederationPolicyInfo.toByteBuffer(weightedFederationPolicyInfo); + + WeightedFederationPolicyInfo temp = + WeightedFederationPolicyInfo.fromByteBuffer(buf); + + if (!weightedFederationPolicyInfo.equals(temp)) { + throw new FederationPolicyInitializationException( + "Cannot successfully serialized and deserialize the configuration " + + "for this object."); + } + + return SubClusterPolicyConfiguration + .newInstance(queue, this.getClass().getCanonicalName(), buf); + } + + public WeightedFederationPolicyInfo getWeightedFederationPolicyInfo() { + return weightedFederationPolicyInfo; + } + + public void setWeightedFederationPolicyInfo( + WeightedFederationPolicyInfo weightedFederationPolicyInfo) { + this.weightedFederationPolicyInfo = weightedFederationPolicyInfo; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastFederationPolicy.java new file mode 100644 index 0000000..4fe8b6b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastFederationPolicy.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import java.nio.ByteBuffer; + +/** + * This class represent a simple implementation of a {@link FederationPolicy}. + * It combines the basic policies: {@link UniformRandomRouterFederationPolicy} + * and {@link BroadcastAMRMProxyFederationPolicy}, which are designed to work + * together and "spread" the load among sub-clusters uniformly. This simple + * policy might impose heavy load on the RMs as all request are broadcasted. + */ +public class UniformBroadcastFederationPolicy extends BaseFederationPolicy { + + public UniformBroadcastFederationPolicy(String queue) { + //this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + this.queue = queue; + routerFederationPolicy = + UniformRandomRouterFederationPolicy.class.getCanonicalName(); + amrmProxyFederationPolicy = + BroadcastAMRMProxyFederationPolicy.class.getCanonicalName(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = ByteBuffer.allocate(0); + return SubClusterPolicyConfiguration + .newInstance(queue, this.getClass().getCanonicalName(), buf); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/FederationSubClusterIdInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/FederationSubClusterIdInfo.java new file mode 100644 index 0000000..11e1aa0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/FederationSubClusterIdInfo.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.dao; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +@XmlRootElement(name = "federation-policy") +@XmlAccessorType(XmlAccessType.FIELD) +public class FederationSubClusterIdInfo { + + String id; + + public FederationSubClusterIdInfo() { + //JAXB needs this + } + + public FederationSubClusterIdInfo(String subClusterId) { + this.id = subClusterId; + } + + public FederationSubClusterIdInfo(SubClusterId subClusterId) { + this.id = subClusterId.getId(); + } + + public SubClusterId toId() { + return SubClusterId.newInstance(id); + } + + public boolean equals(Object other) { + if (other instanceof FederationSubClusterIdInfo) { + if (((FederationSubClusterIdInfo) other).id.equals(this.id)) { + return true; + } + } + return false; + } + + public int hashCode() { + return id.hashCode(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedFederationPolicyInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedFederationPolicyInfo.java new file mode 100644 index 0000000..60d4f1e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedFederationPolicyInfo.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.dao; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.api.json.JSONUnmarshaller; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +/** + * This is a DAO class for the configuration of parameteres for federation + * policies. This generalizes several possible configurations as two lists of + * {@link FederationSubClusterIdInfo} and corresponding weights as a + * {@link Float}. The interpretation of the weight is left to the logic in + * the policy. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +@XmlRootElement(name = "federation-policy") +@XmlAccessorType(XmlAccessType.FIELD) +public class WeightedFederationPolicyInfo { + + public Map routerWeights = new HashMap<>(); + public Map amrmWeights = new HashMap<>(); + public float headroomAlpha; + + public WeightedFederationPolicyInfo() { + //JAXB needs this + } + + public void setRouterWeights( + Map routerWeights) { + this.routerWeights = routerWeights; + } + + public void setAmrmWeights( + Map amrmWeights) { + this.amrmWeights = amrmWeights; + } + + public Map getRouterWeights() { + Map out = new TreeMap<>(); + for (Map.Entry entry : routerWeights + .entrySet()) { + out.put(entry.getKey().toId(), entry.getValue()); + } + return out; + } + + public Map getAmrmWeights() { + Map out = new TreeMap<>(); + for (Map.Entry entry : amrmWeights + .entrySet()) { + out.put(entry.getKey().toId(), entry.getValue()); + } + return out; + } + + public static WeightedFederationPolicyInfo fromByteBuffer(ByteBuffer bb) + throws FederationPolicyInitializationException { + try { + JSONJAXBContext jc = + new JSONJAXBContext(JSONConfiguration.mapped().build(), + WeightedFederationPolicyInfo.class); + JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller(); + + final byte[] bytes = new byte[bb.remaining()]; + bb.duplicate().get(bytes); + String params = new String(bytes); + + WeightedFederationPolicyInfo weightedFederationPolicyInfo = unmarshaller + .unmarshalFromJSON(new StringReader(params), + WeightedFederationPolicyInfo.class); + return weightedFederationPolicyInfo; + } catch (JAXBException j) { + throw new FederationPolicyInitializationException(j); + } + } + + public static ByteBuffer toByteBuffer(WeightedFederationPolicyInfo policyInfo) + throws FederationPolicyInitializationException { + try { + String s = toJSONString(policyInfo); + return ByteBuffer.wrap(s.getBytes()); + } catch (JAXBException j) { + throw new FederationPolicyInitializationException(j); + } + } + + private static String toJSONString(WeightedFederationPolicyInfo policyInfo) + throws JAXBException { + JSONJAXBContext jc = new JSONJAXBContext(JSONConfiguration.mapped().build(), + WeightedFederationPolicyInfo.class); + + JSONMarshaller marshaller = jc.createJSONMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + StringWriter sw = new StringWriter(256); + marshaller.marshallToJSON(policyInfo, sw); + return sw.toString(); + } + + public boolean equals(Object other) { + + if (!other.getClass().equals(this.getClass())) { + return false; + } + + WeightedFederationPolicyInfo otherPolicy = + (WeightedFederationPolicyInfo) other; + Map otherAMRMWeights = + otherPolicy.getAmrmWeights(); + Map otherRouterWeights = + otherPolicy.getRouterWeights(); + + return this.getAmrmWeights().equals(otherAMRMWeights) && this + .getRouterWeights().equals(otherRouterWeights); + } + + public float getHeadroomAlpha() { + return headroomAlpha; + } + + public void setHeadroomAlpha(float headroomAlpha) { + this.headroomAlpha = headroomAlpha; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/FederationPoliciesTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/FederationPoliciesTestUtil.java new file mode 100644 index 0000000..ebacda2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/FederationPoliciesTestUtil.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.util.Records; + +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +public class FederationPoliciesTestUtil { + private static final String FEDR_NODE_PREFIX = "fedr-test-node-"; + + protected static List createResourceRequests(int memory, + int vCores, int priority, int containers, boolean relaxLocality) + throws Exception { + String[] hosts = new String[5]; + for (int i = 0; i < hosts.length; i++) { + hosts[i] = FEDR_NODE_PREFIX + i; + } + return createResourceRequests(hosts, memory, vCores, priority, containers, + null, relaxLocality); + } + + protected static List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + boolean relaxLocality) throws Exception { + return createResourceRequests(hosts, memory, vCores, priority, containers, + null, relaxLocality); + } + + public static List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws Exception { + List reqs = new ArrayList(); + for (String host : hosts) { + ResourceRequest hostReq = + createResourceRequest(host, memory, vCores, priority, containers, + labelExpression, relaxLocality); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression, relaxLocality); + reqs.add(rackReq); + } + + ResourceRequest offRackReq = + createResourceRequest(ResourceRequest.ANY, memory, vCores, priority, + containers, labelExpression, relaxLocality); + reqs.add(offRackReq); + return reqs; + } + + protected static ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + boolean relaxLocality) throws Exception { + return createResourceRequest(resource, memory, vCores, priority, containers, + null, relaxLocality); + } + + public static ResourceRequest createResourceRequest(long id, String resource, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws Exception { + ResourceRequest out = + createResourceRequest(resource, memory, vCores, priority, containers, + labelExpression, relaxLocality); + out.setAllocationRequestId(id); + return out; + } + + public static ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws Exception { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setResourceName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memory); + capability.setVirtualCores(vCores); + req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } + req.setRelaxLocality(relaxLocality); + return req; + } + + protected static AllocateRequest createAllocateRequest() throws Exception { + String[] hosts = new String[] { "host1", "host2" }; + List resourceRequests = + createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + List containersToBeRe = + new ArrayList<>(); + return AllocateRequest + .newInstance(1, 0.9f, resourceRequests, containersToBeRe, null); + } + + + public static SubClusterResolver initResolver() { + YarnConfiguration conf = new YarnConfiguration(); + SubClusterResolver resolver = + new DefaultSubClusterResolverImpl(); + + URL url = + Thread.currentThread().getContextClassLoader().getResource("nodes"); + if (url == null) { + throw new RuntimeException( + "Could not find 'nodes' dummy file in classpath"); + } + + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath()); + resolver.setConf(conf); + resolver.load(); + return resolver; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastFederationPolicy.java new file mode 100644 index 0000000..81fb7ef --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastFederationPolicy.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.AMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.RouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.junit.Assert; +import org.junit.Test; + +/** + * Simple test of {@link UniformBroadcastFederationPolicy} + */ +public class TestUniformBroadcastFederationPolicy { + + @Test + public void testSerializeAndInstantiate() + throws FederationPolicyInitializationException { + + // configure a policy + UniformBroadcastFederationPolicy wfp = + new UniformBroadcastFederationPolicy("queue1"); + + // serializeConf it in a context + SubClusterPolicyConfiguration fpc = + wfp.serializeConf(); + FederationPolicyContext context = new FederationPolicyContext(); + context.setFederationPolicyConfiguration(fpc); + + // based on the "context" created instantiate new class and use it + UniformBroadcastFederationPolicy wfp2 = + new UniformBroadcastFederationPolicy("queue1"); + AMRMProxyFederationPolicy amrmProxyFederationPolicy = + wfp2.getAMRMPolicy(context, null); + + //needed only for tests (getARMRMPolicy change the "type" in conf) + fpc.setType(UniformBroadcastFederationPolicy.class.getCanonicalName()); + + RouterFederationPolicy routerFederationPolicy = + wfp2.getRouterPolicy(context, null); + + Assert.assertTrue( + amrmProxyFederationPolicy instanceof BroadcastAMRMProxyFederationPolicy); + Assert.assertTrue( + routerFederationPolicy instanceof UniformRandomRouterFederationPolicy); + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedFederationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedFederationPolicy.java new file mode 100644 index 0000000..4d80fff --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedFederationPolicy.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.AMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.FederationSubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedFederationPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.RouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRouterFederationPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * Simple test of {@link ProbabilisticMulticastFederationPolicy} + */ +public class TestWeightedFederationPolicy { + + @Test + public void testSerializeAndInstantiate() + throws FederationPolicyInitializationException { + + // configure a policy + ProbabilisticMulticastFederationPolicy + wfp = new ProbabilisticMulticastFederationPolicy("queue1"); + SubClusterId sc1 = SubClusterId.newInstance("sc1"); + SubClusterId sc2 = SubClusterId.newInstance("sc2"); + WeightedFederationPolicyInfo policyInfo = + new WeightedFederationPolicyInfo(); + policyInfo.routerWeights = new HashMap<>(); + policyInfo.routerWeights.put(new FederationSubClusterIdInfo(sc1), 0.2f); + policyInfo.routerWeights.put(new FederationSubClusterIdInfo(sc2), 0.8f); + policyInfo.amrmWeights = new HashMap<>(); + policyInfo.amrmWeights.put(new FederationSubClusterIdInfo(sc1), 0.2f); + policyInfo.amrmWeights.put(new FederationSubClusterIdInfo(sc2), 0.8f); + wfp.setWeightedFederationPolicyInfo(policyInfo); + + // serializeConf it in a context + SubClusterPolicyConfiguration fpc = + wfp.serializeConf(); + + FederationPolicyContext context = new FederationPolicyContext(); + context.setFederationPolicyConfiguration(fpc); + context.setFederationSubclusterResolver(FederationPoliciesTestUtil + .initResolver()); + + // based on the "context" created instantiate new class and use it + ProbabilisticMulticastFederationPolicy + wfp2 = new ProbabilisticMulticastFederationPolicy("queue1"); + AMRMProxyFederationPolicy amrmProxyFederationPolicy = + wfp2.getAMRMPolicy(context, null); + + //needed only for tests (getARMRMPolicy change the "type" in conf) + fpc.setType(ProbabilisticMulticastFederationPolicy.class.getCanonicalName()); + + RouterFederationPolicy routerFederationPolicy = + wfp2.getRouterPolicy(context, null); + + Assert.assertTrue( + amrmProxyFederationPolicy instanceof LocalityMulticastAMRMProxyFederationPolicy); + Assert.assertTrue( + routerFederationPolicy instanceof WeightedRandomRouterFederationPolicy); + } + + + @Test (expected = FederationPolicyInitializationException.class) + public void testSerializeAndInstantiateBad() + throws FederationPolicyInitializationException { + + // configure a policy + ProbabilisticMulticastFederationPolicy + wfp = new ProbabilisticMulticastFederationPolicy("queue1"); + SubClusterId sc1 = SubClusterId.newInstance("sc1"); + SubClusterId sc2 = SubClusterId.newInstance("sc2"); + WeightedFederationPolicyInfo policyInfo = + new WeightedFederationPolicyInfo(); + policyInfo.routerWeights = new HashMap<>(); + policyInfo.routerWeights.put(new FederationSubClusterIdInfo(sc1), 0.2f); + policyInfo.routerWeights.put(new FederationSubClusterIdInfo(sc2), 0.8f); + policyInfo.amrmWeights = new HashMap<>(); + policyInfo.amrmWeights.put(new FederationSubClusterIdInfo(sc1), 0.2f); + policyInfo.amrmWeights.put(new FederationSubClusterIdInfo(sc2), 0.8f); + wfp.setWeightedFederationPolicyInfo(policyInfo); + + // serializeConf it in a context + SubClusterPolicyConfiguration fpc = + wfp.serializeConf(); + fpc.setType("wrongType"); + FederationPolicyContext context = new FederationPolicyContext(); + context.setFederationPolicyConfiguration(fpc); + + // based on the "context" created instantiate new class and use it + ProbabilisticMulticastFederationPolicy + wfp2 = new ProbabilisticMulticastFederationPolicy("queue1"); + AMRMProxyFederationPolicy amrmProxyFederationPolicy = + wfp2.getAMRMPolicy(context, null); + RouterFederationPolicy routerFederationPolicy = + wfp2.getRouterPolicy(context, null); + + Assert.assertTrue( + amrmProxyFederationPolicy instanceof BroadcastAMRMProxyFederationPolicy); + Assert.assertTrue( + routerFederationPolicy instanceof WeightedRouterFederationPolicy); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/dao/TestWeightedFederationPolicyInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/dao/TestWeightedFederationPolicyInfo.java new file mode 100644 index 0000000..3fc8c83 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/dao/TestWeightedFederationPolicyInfo.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.yarn.server.federation.policies.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.junit.Test; + +import javax.xml.bind.JAXBException; +import java.util.HashMap; + +/** + * Created by ccurino on 7/11/16. + */ +public class TestWeightedFederationPolicyInfo { + + @Test + public void testSerializeDeserialize() throws JAXBException { + + SubClusterId sc1 = SubClusterId.newInstance("sc1"); + SubClusterId sc2 = SubClusterId.newInstance("sc2"); + WeightedFederationPolicyInfo policyInfo = + new WeightedFederationPolicyInfo(); + policyInfo.routerWeights = new HashMap<>(); + policyInfo.routerWeights.put(new FederationSubClusterIdInfo(sc1), 0.2f); + policyInfo.routerWeights.put(new FederationSubClusterIdInfo(sc2), 0.8f); + policyInfo.amrmWeights = new HashMap<>(); + policyInfo.amrmWeights.put(new FederationSubClusterIdInfo(sc1), 0.2f); + policyInfo.amrmWeights.put(new FederationSubClusterIdInfo(sc2), 0.8f); + + //System.out.println(WeightedFederationPolicyInfo.toByteBuffer()toJSONString(policyInfo)); + } +}