diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 07f132c..886c108 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -368,6 +368,14 @@ public long getAllocationRequestId() { public void setAllocationRequestId(long allocationRequestID) { throw new UnsupportedOperationException(); } + + @Public + @Evolving + public abstract String getPlacementStrategy(); + + @Public + @Evolving + public abstract void setPlacementStrategy(String placementStrategy); @Override public int hashCode() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 6c337cf..bd94408 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -304,6 +304,7 @@ message ResourceRequestProto { optional string node_label_expression = 6; optional ExecutionTypeRequestProto execution_type_request = 7; optional int64 allocation_request_id = 8 [default = -1]; + optional string placement_strategy = 9; } message ExecutionTypeRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b9949e1..1ed5d2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -81,6 +81,8 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -301,6 +303,8 @@ private int yarnShellIdCounter = 1; + private String placementStrategy = null; + @VisibleForTesting protected final Set launchedContainers = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -402,6 +406,7 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("placement_strategy", true, "Set placement strategy"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -539,6 +544,9 @@ public boolean init(String[] args) throws ParseException, IOException { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + placementStrategy = cliParser.getOptionValue("placement_strategy"); + LOG.info("placement-strategy:" + placementStrategy); + containerRetryPolicy = ContainerRetryPolicy.values()[ Integer.parseInt(cliParser.getOptionValue( "container_retry_policy", "0"))]; @@ -1187,8 +1195,9 @@ private ContainerRequest setupContainerAskForRM() { Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); - ContainerRequest request = new ContainerRequest(capability, null, null, - pri); + ContainerRequest request = new ContainerRequest(capability, null, null, pri, + true, null, ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + placementStrategy); LOG.info("Requested container ask: " + request.toString()); return request; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index eedb501..8e6babe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -204,6 +204,8 @@ public static final String SCRIPT_PATH = "ExecScript"; + private String placementStrategy; + /** * @param args Command line arguments */ @@ -316,6 +318,8 @@ public Client(Configuration conf) throws Exception { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("placement_strategy", true, + "Placement Strategy for task containers"); } /** @@ -475,6 +479,9 @@ public boolean init(String[] args) throws ParseException { containerRetryOptions.add("--container_retry_interval " + cliParser.getOptionValue("container_retry_interval")); } + if (cliParser.hasOption("placement_strategy")) { + placementStrategy = cliParser.getOptionValue("placement_strategy"); + } if (cliParser.hasOption("flow_name")) { flowName = cliParser.getOptionValue("flow_name"); @@ -713,6 +720,10 @@ public boolean run() throws IOException, YarnException { vargs.addAll(containerRetryOptions); + if (placementStrategy != null) { + vargs.add("--placement_strategy \"" + placementStrategy + "\""); + } + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 7acaf11..2b892db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.client.api; @@ -46,10 +46,9 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; -@InterfaceAudience.Public -@InterfaceStability.Stable -public abstract class AMRMClient extends - AbstractService { +@InterfaceAudience.Public @InterfaceStability.Stable +public abstract class AMRMClient + extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClient.class); private TimelineClient timelineClient; @@ -71,8 +70,7 @@ private NMTokenCache nmTokenCache; - @Private - protected AMRMClient(String name) { + @Private protected AMRMClient(String name) { super(name); nmTokenCache = NMTokenCache.getSingleton(); } @@ -81,7 +79,7 @@ protected AMRMClient(String name) { * Object to represent a single container request for resources. Scheduler * documentation should be consulted for the specifics of how the parameters * are honored. - * + * * By default, YARN schedulers try to allocate containers at the requested * locations but they may relax the constraints in order to expedite meeting * allocations limits. They first relax the constraint to the same rack as the @@ -102,7 +100,7 @@ protected AMRMClient(String name) { * To re-enable locality relaxation at a given priority, all pending requests * with locality relaxation disabled must be first removed. Then they can be * added back with locality relaxation enabled. - * + * * All getters return immutable values. */ public static class ContainerRequest { @@ -113,11 +111,12 @@ protected AMRMClient(String name) { final boolean relaxLocality; final String nodeLabelsExpression; final ExecutionTypeRequest executionTypeRequest; - + final String placementStrategy; + /** * Instantiates a {@link ContainerRequest} with the given constraints and * locality relaxation enabled. - * + * * @param capability * The {@link Resource} to be requested for each container. * @param nodes @@ -130,14 +129,14 @@ protected AMRMClient(String name) { * The priority at which to request the containers. Higher * priorities have lower numerical values. */ - public ContainerRequest(Resource capability, String[] nodes, - String[] racks, Priority priority) { + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority) { this(capability, nodes, racks, priority, true, null); } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. - * + * * @param capability * The {@link Resource} to be requested for each container. * @param nodes @@ -153,8 +152,8 @@ public ContainerRequest(Resource capability, String[] nodes, * If true, containers for this request may be assigned on hosts * and racks other than the ones explicitly requested. */ - public ContainerRequest(Resource capability, String[] nodes, - String[] racks, Priority priority, boolean relaxLocality) { + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, boolean relaxLocality) { this(capability, nodes, racks, priority, relaxLocality, null); } @@ -183,12 +182,12 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality, String nodeLabelsExpression) { this(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression, - ExecutionTypeRequest.newInstance()); + ExecutionTypeRequest.newInstance(), null); } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. - * + * * @param capability * The {@link Resource} to be requested for each container. * @param nodes @@ -211,18 +210,18 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality, String nodeLabelsExpression, - ExecutionTypeRequest executionTypeRequest) { + ExecutionTypeRequest executionTypeRequest, String placementStrategy) { // Validate request Preconditions.checkArgument(capability != null, - "The Resource to be requested for each container " + - "should not be null "); + "The Resource to be requested for each container " + + "should not be null "); Preconditions.checkArgument(priority != null, "The priority at which to request containers should not be null "); Preconditions.checkArgument( - !(!relaxLocality && (racks == null || racks.length == 0) - && (nodes == null || nodes.length == 0)), - "Can't turn off locality relaxation on a " + - "request with no location constraints"); + !(!relaxLocality && (racks == null || racks.length == 0) && ( + nodes == null || nodes.length == 0)), + "Can't turn off locality relaxation on a " + + "request with no location constraints"); this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); @@ -230,28 +229,29 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, this.relaxLocality = relaxLocality; this.nodeLabelsExpression = nodeLabelsExpression; this.executionTypeRequest = executionTypeRequest; + this.placementStrategy = placementStrategy; } - + public Resource getCapability() { return capability; } - + public List getNodes() { return nodes; } - + public List getRacks() { return racks; } - + public Priority getPriority() { return priority; } - + public boolean getRelaxLocality() { return relaxLocality; } - + public String getNodeLabelExpression() { return nodeLabelsExpression; } @@ -260,16 +260,21 @@ public ExecutionTypeRequest getExecutionTypeRequest() { return executionTypeRequest; } + public String getPlacementStrategy() { + return placementStrategy; + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); sb.append("Priority[").append(priority).append("]"); sb.append("ExecutionTypeRequest[").append(executionTypeRequest) .append("]"); + sb.append("PlacementStrategy[").append(placementStrategy).append("]"); return sb.toString(); } } - + /** * Register the application master. This must be called before any * other interaction @@ -280,12 +285,10 @@ public String toString() { * @throws YarnException * @throws IOException */ - public abstract RegisterApplicationMasterResponse - registerApplicationMaster(String appHostName, - int appHostPort, - String appTrackingUrl) - throws YarnException, IOException; - + public abstract RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl) + throws YarnException, IOException; + /** * Request additional containers and receive new container allocations. * Requests made via addContainerRequest are sent to the @@ -295,22 +298,22 @@ public String toString() { * must be made periodically. The call may not always return any new * allocations of containers. App should not make concurrent allocate * requests. May cause request loss. - * + * *

* Note : If the user has not removed container requests that have already * been satisfied, then the re-register may end up sending the entire * container requests to the RM (including matched requests). Which would mean * the RM could end up giving it a lot of new allocated containers. *

- * + * * @param progressIndicator Indicates progress made by the master * @return the response of the allocate request * @throws YarnException * @throws IOException */ - public abstract AllocateResponse allocate(float progressIndicator) - throws YarnException, IOException; - + public abstract AllocateResponse allocate(float progressIndicator) + throws YarnException, IOException; + /** * Unregister the application master. This must be called in the end. * @param appStatus Success/Failure status of the master @@ -319,11 +322,10 @@ public abstract AllocateResponse allocate(float progressIndicator) * @throws YarnException * @throws IOException */ - public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus, - String appMessage, - String appTrackingUrl) - throws YarnException, IOException; - + public abstract void unregisterApplicationMaster( + FinalApplicationStatus appStatus, String appMessage, + String appTrackingUrl) throws YarnException, IOException; + /** * Request containers for resources before calling allocate * @param req Resource request @@ -355,8 +357,8 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu * allocation or resource change * @param capability The target resource capability of the container */ - public abstract void requestContainerResourceChange( - Container container, Resource capability); + public abstract void requestContainerResourceChange(Container container, + Resource capability); /** * Release containers assigned by the Resource Manager. If the app cannot use @@ -366,14 +368,14 @@ public abstract void requestContainerResourceChange( * @param containerId */ public abstract void releaseAssignedContainer(ContainerId containerId); - + /** * Get the currently available resources in the cluster. * A valid value is available after a call to allocate has been made * @return Currently available resources */ public abstract Resource getAvailableResources(); - + /** * Get the current number of nodes in the cluster. * A valid values is available after a call to allocate has been made @@ -421,10 +423,10 @@ public abstract void requestContainerResourceChange( throw new UnsupportedOperationException("The sub-class extending" + " AMRMClient is expected to implement this !!"); } - + /** * Update application's blacklist with addition or removal resources. - * + * * @param blacklistAdditions list of resources which should be added to the * application blacklist * @param blacklistRemovals list of resources which should be removed from the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 8af0c78..ee89aaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -195,6 +195,8 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, * @param req Resource request */ public void addContainerRequest(T req) { + //DEBUG + System.out.println("XXX " + req.toString()); client.addContainerRequest(req); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4145944..203e0c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -263,10 +263,12 @@ public AllocateResponse allocate(float progressIndicator) for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - askList.add(ResourceRequest.newInstance(r.getPriority(), + ResourceRequest tmp = ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), r.getRelaxLocality(), r.getNodeLabelExpression(), - r.getExecutionTypeRequest())); + r.getExecutionTypeRequest()); + tmp.setPlacementStrategy(r.getPlacementStrategy()); + askList.add(tmp); } List increaseList = new ArrayList<>(); List decreaseList = new ArrayList<>(); @@ -482,6 +484,9 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, @Override public synchronized void addContainerRequest(T req) { + //DEBUG + System.out.println(req.toString()); + Preconditions.checkArgument(req != null, "Resource request can not be null."); Set dedupedRacks = new HashSet(); @@ -520,13 +525,14 @@ public synchronized void addContainerRequest(T req) { for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, req.getExecutionTypeRequest(), req.getCapability(), req, true, - req.getNodeLabelExpression()); + req.getNodeLabelExpression(), req.getPlacementStrategy()); } } for (String rack : dedupedRacks) { - addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - req.getCapability(), req, true, req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), rack, + req.getExecutionTypeRequest(), req.getCapability(), req, true, + req.getNodeLabelExpression(), req.getPlacementStrategy()); } // Ensure node requests are accompanied by requests for @@ -534,12 +540,13 @@ public synchronized void addContainerRequest(T req) { for (String rack : inferredRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), req.getCapability(), req, req.getRelaxLocality(), - req.getNodeLabelExpression()); + req.getNodeLabelExpression(), req.getPlacementStrategy()); } // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getExecutionTypeRequest(), req.getCapability(), req, - req.getRelaxLocality(), req.getNodeLabelExpression()); + req.getRelaxLocality(), req.getNodeLabelExpression(), + req.getPlacementStrategy()); } @Override @@ -741,20 +748,24 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void addResourceRequest(Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, Resource capability, T req, - boolean relaxLocality, String labelExpression) { + boolean relaxLocality, String labelExpression, String placementStrategy) { @SuppressWarnings("unchecked") ResourceRequestInfo resourceRequestInfo = remoteRequestsTable - .addResourceRequest(priority, resourceName, - execTypeReq, capability, req, relaxLocality, labelExpression); + .addResourceRequest(priority, resourceName, execTypeReq, capability, + req, relaxLocality, labelExpression, placementStrategy); // Note this down for next interaction with ResourceManager addResourceRequestToAsk(resourceRequestInfo.remoteRequest); + //DEBUG + System.err.println("placement_strategy=" + resourceRequestInfo.remoteRequest + .getPlacementStrategy()); + if (LOG.isDebugEnabled()) { LOG.debug("addResourceRequest:" + " applicationId=" + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + resourceRequestInfo.remoteRequest.getNumContainers() + + resourceRequestInfo.remoteRequest.getNumContainers() + " #asks=" + ask.size()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java index 853a512..ec12b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -266,7 +266,7 @@ ResourceRequestInfo remove(Priority priority, String resourceName, @SuppressWarnings("unchecked") ResourceRequestInfo addResourceRequest(Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, Resource capability, T req, - boolean relaxLocality, String labelExpression) { + boolean relaxLocality, String labelExpression, String placementStrategy) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); if (resourceRequestInfo == null) { @@ -277,6 +277,7 @@ ResourceRequestInfo addResourceRequest(Priority priority, String resourceName, resourceRequestInfo); } resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq); + resourceRequestInfo.remoteRequest.setPlacementStrategy(placementStrategy); resourceRequestInfo.remoteRequest.setNumContainers( resourceRequestInfo.remoteRequest.getNumContainers() + 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 57cdbfb..6123894 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -365,26 +365,26 @@ public void testAMRMClientMatchingFitExecType() new ContainerRequest(capability2, nodes, racks, priority); ContainerRequest storedOpportContainer1 = new ContainerRequest(capability1, nodes, racks, priority, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); ContainerRequest storedOpportContainer2 = new ContainerRequest(capability2, nodes, racks, priority, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); ContainerRequest storedOpportContainer3 = new ContainerRequest(capability3, nodes, racks, priority, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); ContainerRequest storedOpportContainer4 = new ContainerRequest(capability4, nodes, racks, priority, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); ContainerRequest storedOpportContainer5 = new ContainerRequest(capability5, nodes, racks, priority, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); ContainerRequest storedOpportContainer6 = new ContainerRequest(capability6, nodes, racks, priority, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); ContainerRequest storedOpportContainer7 = new ContainerRequest(capability7, nodes, racks, priority2, false, null, - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), null); amClient.addContainerRequest(storedGuarContainer1); amClient.addContainerRequest(storedGuarContainer2); amClient.addContainerRequest(storedOpportContainer1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index 2db33c1..8aa2688 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -63,7 +63,7 @@ public void testOpportunisticAndGuaranteedRequests() { new ContainerRequest(capability, new String[] {"host1", "host2"}, new String[] {"/rack2"}, Priority.newInstance(1), true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)); + ExecutionType.OPPORTUNISTIC, true), null); client.addContainerRequest(request2); verifyResourceRequest(client, request, "host1", true, ExecutionType.OPPORTUNISTIC); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 71321e3..d538873 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -366,12 +366,12 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, null, null, priority2, true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ExecutionType.OPPORTUNISTIC, true), null)); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ExecutionType.OPPORTUNISTIC, true), null)); amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); @@ -381,7 +381,7 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, null, null, priority2, true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ExecutionType.OPPORTUNISTIC, true), null)); int containersRequestedNode = amClient.remoteRequestsTable.get(priority, node, ExecutionType.GUARANTEED, capability).remoteRequest @@ -457,7 +457,7 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ExecutionType.OPPORTUNISTIC, true), null)); assertEquals(4, amClient.ask.size()); // test RPC exception handling @@ -469,7 +469,7 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ExecutionType.OPPORTUNISTIC, true), null)); final AMRMClient amc = amClient; ApplicationMasterProtocol realRM = amClient.rmClient; @@ -490,7 +490,7 @@ public AllocateResponse answer(InvocationOnMock invocation) new AMRMClient.ContainerRequest(capability, null, null, priority2, true, null, ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); + ExecutionType.OPPORTUNISTIC, true), null)); throw new Exception(); } }); @@ -569,7 +569,7 @@ public void testAMOpportunistic() throws Exception { ExecutionTypeRequest execTypeRequest = ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true); ContainerRequest containerRequest = new AMRMClient.ContainerRequest( - capability, nodes, racks, priority, true, null, execTypeRequest); + capability, nodes, racks, priority, true, null, execTypeRequest, null); amClient.addContainerRequest(containerRequest); // Wait until the container is allocated diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 9890296..92e3d1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -250,4 +250,23 @@ public void setNodeLabelExpression(String nodeLabelExpression) { } builder.setNodeLabelExpression(nodeLabelExpression); } + + @Override + public String getPlacementStrategy() { + ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasPlacementStrategy()) { + return null; + } + return (p.getPlacementStrategy()); + } + + @Override + public void setPlacementStrategy(String placementStrategy) { + maybeInitBuilder(); + if (placementStrategy == null) { + builder.clearPlacementStrategy(); + return; + } + builder.setPlacementStrategy(placementStrategy); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index c677345..0b2effd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.RMPlacementStrategy; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -82,9 +83,11 @@ final Set schedulerKeys = new TreeSet<>(); final Map> resourceRequestMap = new ConcurrentHashMap<>(); - final Map>> containerIncreaseRequestMap = - new ConcurrentHashMap<>(); + final Map>> + containerIncreaseRequestMap = new ConcurrentHashMap<>(); + private Map placementStrategyMap = + new HashMap<>(); + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -368,6 +371,15 @@ public synchronized boolean updateResourceRequests( // Update pendingResources updatePendingResources(lastRequest, request, queue.getMetrics()); + + // Update placement strategy + RMPlacementStrategy strategy = RMPlacementStrategy.fromString( + request.getPlacementStrategy()); + + //DEBUG + System.out.println( + "request.placement_strategy:" + request.getPlacementStrategy()); + placementStrategyMap.put(schedulerKey, strategy); } } return anyResourcesUpdated; @@ -839,4 +851,10 @@ public ResourceRequest cloneResourceRequest(ResourceRequest request) { request.getRelaxLocality(), request.getNodeLabelExpression()); return newRequest; } + + public synchronized RMPlacementStrategy getPlacementStrategy( + SchedulerRequestKey schedulerKey) { + RMPlacementStrategy s = placementStrategyMap.get(schedulerKey); + return s == null ? RMPlacementStrategy.fromString(null) : s; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/RMPlacementStrategy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/RMPlacementStrategy.java new file mode 100644 index 0000000..7d16df6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/RMPlacementStrategy.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.commons.collections.MapUtils; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import java.util.HashMap; +import java.util.IllegalFormatException; +import java.util.Map; + +public class RMPlacementStrategy { + private Operator op; + private PlacementSetType setType; + private Map targets; + + private static final RMPlacementStrategy DEFAULT_RM_PLACEMENT_STRATEGY = + new RMPlacementStrategy() { + @Override + public Operator getOp() { + return Operator.NO; + } + + @Override + public PlacementSetType getPlacementSetType() { + return PlacementSetType.HOST; + } + + @Override + public Map getTargets() { + return MapUtils.EMPTY_MAP; + } + }; + + public enum Operator { + NO, + AFFINITY, + ANTI_AFFINITY, + } + + public enum PlacementSetType { + HOST, + // RACK, + // PARTITION + } + + public enum TargetType { + APPLICATION, + PRIORITY, + } + + /** + * Input string format like: + * + * "op=AFFINITY, placement-set-type=HOST, \ + * targets=[application=application_12345_1;priority=1]" + */ + public static RMPlacementStrategy fromString(String string) + throws IllegalFormatException { + if (null == string || string.isEmpty()) { + return DEFAULT_RM_PLACEMENT_STRATEGY; + } + + Operator op = Operator.NO; + PlacementSetType setType = PlacementSetType.HOST; + Map targets = MapUtils.EMPTY_MAP; + + string = string.replaceAll("\\s+",""); + for (String part : string.split(",")) { + if (part.contains("=")) { + String key = part.substring(0, part.indexOf('=')).toLowerCase(); + String value = part.substring(part.indexOf('=') + 1); + + if (key.equals("op")) { + op = Operator.valueOf(value.toUpperCase()); + } else if (key.equals("placement-set-type")) { + setType = PlacementSetType.valueOf(value.toUpperCase()); + } else if (key.equals("targets")) { + targets = new HashMap<>(); + + value = value.substring(value.indexOf('[') + 1, value.indexOf(']')) + .toLowerCase(); + if (value.isEmpty()) { + continue; + } + for (String t : value.split(";")) { + String tk = t.substring(0, t.indexOf('=')).toLowerCase(); + String tv = t.substring(t.indexOf('=') + 1); + if (tk.equals("application")) { + targets.put(TargetType.APPLICATION, + ConverterUtils.toApplicationId(tv)); + } else if (tk.equals("priority")) { + targets.put(TargetType.PRIORITY, + Priority.newInstance(Integer.valueOf(Integer.parseInt(tv)))); + } else { + throw new HadoopIllegalArgumentException( + "Unknown target:" + string); + } + } + } else { + throw new HadoopIllegalArgumentException("Unknown key:" + string); + } + } else { + throw new HadoopIllegalArgumentException( + "Each part should contains '=':" + string); + } + } + + return new RMPlacementStrategy(op, setType, targets); + } + + public RMPlacementStrategy() { + + } + + public RMPlacementStrategy(Operator op, PlacementSetType type, + Map targets) { + this.op = op; + this.setType = type; + this.targets = targets; + } + + public Operator getOp() { + return op; + } + + public PlacementSetType getPlacementSetType() { + return setType; + } + + public Map getTargets() { + return targets; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/AffinityOrAntiAffinityScorer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/AffinityOrAntiAffinityScorer.java new file mode 100644 index 0000000..b1189c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/AffinityOrAntiAffinityScorer.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.RMPlacementStrategy; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class AffinityOrAntiAffinityScorer + extends AbstractSchedulerNodesScorer { + AffinityOrAntiAffinityScorer(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey) { + super(attempt, schedulerKey); + } + + private boolean canUse(RMPlacementStrategy ps, N node) { + boolean contains = false; + + ApplicationId targetApplicationId = (ApplicationId) ps.getTargets().get( + RMPlacementStrategy.TargetType.APPLICATION); + if (null == targetApplicationId) { + targetApplicationId = attempt.getApplicationId(); + } + Priority targetPriority = (Priority) ps.getTargets().get( + RMPlacementStrategy.TargetType.PRIORITY); + + for (RMContainer c : node.getCopiedListOfRunningContainers()) { + if (contains) { + break; + } + + if (c.getApplicationAttemptId().getApplicationId().equals( + targetApplicationId)) { + if (targetPriority != null) { + if (targetPriority.equals(c.getAllocatedPriority()) || targetPriority + .equals(c.getReservedSchedulerKey())) { + contains = true; + break; + } + } else { + contains = true; + break; + } + } + } + + if (ps.getOp() == RMPlacementStrategy.Operator.ANTI_AFFINITY) { + return !contains; + } else if (ps.getOp() == RMPlacementStrategy.Operator.AFFINITY) { + return contains; + } + + return false; + } + + @Override + public Iterator scorePlacementSet(PlacementSet candidates) { + RMPlacementStrategy strategy = + attempt.getAppSchedulingInfo().getPlacementStrategy(schedulerKey); + if (null == strategy + || strategy.getOp() == RMPlacementStrategy.Operator.NO) { + return IteratorUtils.singletonIterator(candidates.getNextAvailable()); + } + + List usableNodes = new ArrayList<>(); + for (N n : candidates.getAllSchedulableNodes().values()) { + if (canUse(strategy, n)) { + usableNodes.add(n); + } + } + + return usableNodes.iterator(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java index 41f222c..6348352 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerCache.java @@ -1,11 +1,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.scorer; import org.apache.commons.collections.map.LRUMap; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.RMPlacementStrategy; import java.util.Map; @@ -21,6 +21,17 @@ private static SchedulerNodesScorerType getSchedulerNodesScorerType( Map requests = attempt.getResourceRequests( schedulerKey); + // Check placement strategy + RMPlacementStrategy ps = + attempt.getAppSchedulingInfo().getPlacementStrategy(schedulerKey); + + //DEBUG + System.out.println("Placement Strategy:" + ps.getOp()); + + if (RMPlacementStrategy.Operator.NO != ps.getOp()) { + return SchedulerNodesScorerType.AFFINITY_OR_ANTIAFFNITY; + } + // Simplest rule to determine with nodes scorer will be used: // When requested #resourceName > 0, use locality, otherwise use DO_NOT_CARE if (requests != null && requests.size() > 1) { @@ -58,6 +69,9 @@ private static SchedulerNodesScorerType getSchedulerNodesScorerType( case ANY: scorer = new DoNotCareNodesScorer<>(); break; + case AFFINITY_OR_ANTIAFFNITY: + scorer = new AffinityOrAntiAffinityScorer(attempt, schedulerKey); + break; default: return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java index db9c6fc..495438d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/scorer/SchedulerNodesScorerType.java @@ -3,4 +3,5 @@ public enum SchedulerNodesScorerType { ANY, // Any nodes is fine LOCALITY, // Locality-based + AFFINITY_OR_ANTIAFFNITY, // affinity or anti-affinity } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestPlacementStrategyAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestPlacementStrategyAllocation.java new file mode 100644 index 0000000..6664416 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestPlacementStrategyAllocation.java @@ -0,0 +1,222 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestPlacementStrategyAllocation { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULE_GLOBALLY_ENABLE, + true); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + private List initNMs(int nNM, MockRM rm) throws Exception { + List nms = new ArrayList<>(); + for (int i = 1; i < nNM; i++) { + MockNM nm = rm.registerNode("h-" + i + ":1234", 8000); + nms.add(nm); + } + return nms; + } + + @Test(timeout = 300000) + public void testAffinityAllocationBetweenApps() throws Exception { + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + MockRM rm = new MockRM(conf); + rm.start(); + List nms = initNMs(100, rm); + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm.submitApp(200, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nms.get(0)); + + am1.allocate("*", 1024, 80, new ArrayList<>(), ""); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Do node heartbeats 2 times for each node + for (int i = 0; i < 2; i++) { + for (MockNM nm : nms) { + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + } + + // We should have 81 containers allocated for am1 + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(81, schedulerApp1.getLiveContainers().size()); + + // Now we submit an app2 + RMApp app2 = rm.submitApp(200, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nms.get(0)); + + ResourceRequest rr = ResourceRequest.newInstance(Priority.newInstance(0), "*", + Resources.createResource(1024), 100); + rr.setPlacementStrategy(String + .format("op=%s,targets=[application=%s]", "AFFINITY", + app1.getApplicationId().toString())); + + am2.allocate(Arrays.asList(rr), null); + + // Do node heartbeats 2 times for each node + for (int i = 0; i < 2; i++) { + for (MockNM nm : nms) { + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + } + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(101, schedulerApp2.getLiveContainers().size()); + + // Check affinity allocation, if a node has container from app2, the node + // should also have container from app1 + for (MockNM nm : nms) { + SchedulerNode sn = cs.getNode(nm.getNodeId()); + + boolean hasApp2Container = false; + for (RMContainer c : sn.getCopiedListOfRunningContainers()) { + if (!c.isAMContainer() && c.getApplicationAttemptId().equals( + am2.getApplicationAttemptId())) { + hasApp2Container = true; + break; + } + } + + boolean hasApp1Container = false; + if (hasApp2Container) { + for (RMContainer c : sn.getCopiedListOfRunningContainers()) { + if (c.getApplicationAttemptId().equals(am1.getApplicationAttemptId())) { + hasApp1Container = true; + break; + } + } + + Assert.assertTrue(hasApp1Container); + } + } + + rm.close(); + } + + + @Test(timeout = 300000) + public void testAntiAffinityAllocationBetweenApps() throws Exception { + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + MockRM rm = new MockRM(conf); + rm.start(); + List nms = initNMs(100, rm); + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm.submitApp(200, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nms.get(0)); + + am1.allocate("*", 1024, 80, new ArrayList<>(), ""); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Do node heartbeats 2 times for each node + for (int i = 0; i < 2; i++) { + for (MockNM nm : nms) { + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + } + + // We should have 81 containers allocated for am1 + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(81, schedulerApp1.getLiveContainers().size()); + + // Now we submit an app2 + RMApp app2 = rm.submitApp(200, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nms.get(0)); + + ResourceRequest rr = ResourceRequest.newInstance(Priority.newInstance(0), "*", + Resources.createResource(1024), 50); + rr.setPlacementStrategy(String + .format("op=%s,targets=[application=%s]", "ANTI_AFFINITY", + app1.getApplicationId().toString())); + + am2.allocate(Arrays.asList(rr), null); + + // Do node heartbeats 10 times for each node + for (int i = 0; i < 10; i++) { + for (MockNM nm : nms) { + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + } + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(51, schedulerApp2.getLiveContainers().size()); + + // Check anti affinity allocation, if a node has container from app2, the node + // should not have container from app1 + for (MockNM nm : nms) { + SchedulerNode sn = cs.getNode(nm.getNodeId()); + + boolean hasApp2Container = false; + for (RMContainer c : sn.getCopiedListOfRunningContainers()) { + if (!c.isAMContainer() && c.getApplicationAttemptId().equals( + am2.getApplicationAttemptId())) { + hasApp2Container = true; + break; + } + } + + boolean hasApp1Container = false; + if (hasApp2Container) { + for (RMContainer c : sn.getCopiedListOfRunningContainers()) { + if (c.getApplicationAttemptId().equals(am1.getApplicationAttemptId())) { + hasApp1Container = true; + break; + } + } + + Assert.assertFalse(hasApp1Container); + } + } + + rm.close(); + } +}