diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 283f89e..85db08a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -128,7 +128,7 @@ private SubClusterId homeSubcluster; @Override - public void reinitialize( + public synchronized void reinitialize( FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { @@ -143,10 +143,8 @@ public void reinitialize( Map newWeightsConverted = new HashMap<>(); boolean allInactive = true; WeightedPolicyInfo policy = getPolicyInfo(); - if (policy.getAMRMPolicyWeights() == null - || policy.getAMRMPolicyWeights().size() == 0) { - allInactive = false; - } else { + if (policy.getAMRMPolicyWeights() != null + && policy.getAMRMPolicyWeights().size() > 0) { for (Map.Entry e : policy.getAMRMPolicyWeights() .entrySet()) { if (e.getValue() > 0) { @@ -186,14 +184,15 @@ public void reinitialize( } @Override - public void notifyOfResponse(SubClusterId subClusterId, + public synchronized void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException { // stateless policy does not care about responses except tracking headroom headroom.put(subClusterId, response.getAvailableResources()); } @Override - public Map> splitResourceRequests( + public synchronized + Map> splitResourceRequests( List resourceRequests) throws YarnException { // object used to accumulate statistics about the answer, initialize with @@ -347,7 +346,7 @@ private void splitIndividualAny(ResourceRequest originalResourceRequest, originalResourceRequest.getExecutionTypeRequest()); out.setAllocationRequestId(allocationId); out.setNumContainers((int) Math.ceil(numContainer)); - if (out.isAnyLocation(out.getResourceName())) { + if (ResourceRequest.isAnyLocation(out.getResourceName())) { allocationBookkeeper.addAnyRR(targetId, out); } else { allocationBookkeeper.addRackRR(targetId, out); @@ -475,10 +474,12 @@ private void reinitialize( * on a per-allocation-id and per-subcluster bases. */ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) { - Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + Preconditions + .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { - countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); + countContainersPerRM.put(rr.getAllocationRequestId(), + new HashMap()); } if (!countContainersPerRM.get(rr.getAllocationRequestId()) .containsKey(targetId)) { @@ -497,7 +498,8 @@ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) { * Add a rack-local request to the final asnwer. */ public void addRackRR(SubClusterId targetId, ResourceRequest rr) { - Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + Preconditions + .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); internalAddToAnswer(targetId, rr); } @@ -505,7 +507,8 @@ public void addRackRR(SubClusterId targetId, ResourceRequest rr) { * Add an ANY request to the final answer. */ private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { - Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName())); + Preconditions + .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName())); internalAddToAnswer(targetId, rr); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java index 3cf73b6..3d42e1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java @@ -90,7 +90,7 @@ protected static void serializeAndDeserializePolicyManager( wfp2.getAMRMPolicy(context, null); // needed only for tests (getARMRMPolicy change the "type" in conf) - fpc.setType(wfp.getClass().getCanonicalName()); + //fpc.setType(wfp.getClass().getCanonicalName()); FederationRouterPolicy federationRouterPolicy = wfp2.getRouterPolicy(context, null);