Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.4.0
-
None
-
None
-
RHEL 6.1
YARN 2.4
Description
YARN appears to be ignoring host-level ContainerRequests.
I am creating a container request with code that pretty closely mirrors the DistributedShell code:
protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) { info("Requesting %d container(s) with %dmb of memory" format (containers, memMb)) val capability = Records.newRecord(classOf[Resource]) val priority = Records.newRecord(classOf[Priority]) priority.setPriority(0) capability.setMemory(memMb) capability.setVirtualCores(cpuCores) // Specifying a host in the String[] host parameter here seems to do nothing. Setting relaxLocality to false also doesn't help. (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority))) }
When I run this code with a specific host in the ContainerRequest, YARN does not honor the request. Instead, it puts the container on an arbitrary host. This appears to be true for both the FifoScheduler and the CapacityScheduler.
Currently, we are running the CapacityScheduler with the following settings:
<configuration> <property> <name>yarn.scheduler.capacity.maximum-applications</name> <value>10000</value> <description> Maximum number of applications that can be pending and running. </description> </property> <property> <name>yarn.scheduler.capacity.maximum-am-resource-percent</name> <value>0.1</value> <description> Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications. </description> </property> <property> <name>yarn.scheduler.capacity.resource-calculator</name> <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> <description> The ResourceCalculator implementation to be used to compare Resources in the scheduler. The default i.e. DefaultResourceCalculator only uses Memory while DominantResourceCalculator uses dominant-resource to compare multi-dimensional resources such as Memory, CPU etc. </description> </property> <property> <name>yarn.scheduler.capacity.root.queues</name> <value>default</value> <description> The queues at the this level (root is the root queue). </description> </property> <property> <name>yarn.scheduler.capacity.root.default.capacity</name> <value>100</value> <description>Samza queue target capacity.</description> </property> <property> <name>yarn.scheduler.capacity.root.default.user-limit-factor</name> <value>1</value> <description> Default queue user limit a percentage from 0.0 to 1.0. </description> </property> <property> <name>yarn.scheduler.capacity.root.default.maximum-capacity</name> <value>100</value> <description> The maximum capacity of the default queue. </description> </property> <property> <name>yarn.scheduler.capacity.root.default.state</name> <value>RUNNING</value> <description> The state of the default queue. State can be one of RUNNING or STOPPED. </description> </property> <property> <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name> <value>*</value> <description> The ACL of who can submit jobs to the default queue. </description> </property> <property> <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name> <value>*</value> <description> The ACL of who can administer jobs on the default queue. </description> </property> <property> <name>yarn.scheduler.capacity.node-locality-delay</name> <value>40</value> <description> Number of missed scheduling opportunities after which the CapacityScheduler attempts to schedule rack-local containers. Typically this should be set to number of nodes in the cluster, By default is setting approximately number of nodes in one rack which is 40. </description> </property> </configuration>
Digging into the code a bit (props to jghoman for finding this), we have a theory as to why this is happening. It looks like RMContainerRequestor.addContainerReq adds three resource requests per container request: data-local, rack-local, and any:
protected void addContainerReq(ContainerRequest req) { // Create resource requests for (String host : req.hosts) { // Data-local if (!isNodeBlacklisted(host)) { addResourceRequest(req.priority, host, req.capability); } } // Nothing Rack-local for now for (String rack : req.racks) { addResourceRequest(req.priority, rack, req.capability); } // Off-switch addResourceRequest(req.priority, ResourceRequest.ANY, req.capability); }
The addResourceRequest method, in turn, calls addResourceRequestToAsk, which in turn calls ask.add(remoteRequest):
private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // because objects inside the resource map can be deleted ask can end up // containing an object that matches new resource object but with different // numContainers. So exisintg values must be replaced explicitly if(ask.contains(remoteRequest)) { ask.remove(remoteRequest); } ask.add(remoteRequest); }
The problem is that the "ask" variable is a TreeSet:
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
The ResourceRequestComparator sorts the TreeSet according to:
public int compare(ResourceRequest r1, ResourceRequest r2) { // Compare priority, host and capability int ret = r1.getPriority().compareTo(r2.getPriority()); if (ret == 0) { String h1 = r1.getResourceName(); String h2 = r2.getResourceName(); ret = h1.compareTo(h2); } if (ret == 0) { ret = r1.getCapability().compareTo(r2.getCapability()); } return ret; }
The first thing to note is that our resource requests all have the same priority, so the TreeSet is really sorted by resource name (host/rack). The resource names that are added as part of addContainerReq are host, rack, and any, which is denoted as "*" (see above). The problem with this is that the TreeSet is going to sort the resource requests with the "*" request first, even if the host request was added first in addContainerReq.
> import java.util.TreeSet > val set = new TreeSet[String] set: java.util.TreeSet[String] = [] > set.add("eat1-app") > set res3: java.util.TreeSet[String] = [eat1-app] > set.add("*") > set res5: java.util.TreeSet[String] = [*, eat1-app]
From here on out, it seems to me that anything interacting with the "ask" TreeSet (including the allocation requests) will be using the most general resource request, not the most specific.