diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 1bb8b38ab0d..94acaed7073 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -557,10 +557,15 @@ private void createAMForJob(Map jsonJob) throws YarnException { executionType = ExecutionType.valueOf( jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); } + long allocationId = -1; + if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) { + allocationId = Long.parseLong( + jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString()); + } for (int i = 0; i < count; i++) { containers.add( new ContainerSimulator(res, duration, hostname, priority, type, - executionType)); + executionType, allocationId)); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index ac83ab258fd..1330e4d2f2b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -272,8 +272,8 @@ public Object run() throws Exception { } protected ResourceRequest createResourceRequest(Resource resource, - ExecutionType executionType, String host, int priority, int - numContainers) { + ExecutionType executionType, String host, int priority, long + allocationId, int numContainers) { ResourceRequest request = recordFactory .newRecordInstance(ResourceRequest.class); request.setCapability(resource); @@ -284,6 +284,7 @@ protected ResourceRequest createResourceRequest(Resource resource, Priority prio = recordFactory.newRecordInstance(Priority.class); prio.setPriority(priority); request.setPriority(prio); + request.setAllocationRequestId(allocationId); return request; } @@ -406,11 +407,22 @@ public void untrackApp() { protected List packageRequests( List csList, int priority) { // create requests - Map rackLocalRequestMap = new HashMap(); - Map nodeLocalRequestMap = new HashMap(); - ResourceRequest anyRequest = null; + Map> rackLocalRequests = + new HashMap<>(); + Map> nodeLocalRequests = + new HashMap<>(); + Map anyRequests = new HashMap<>(); for (ContainerSimulator cs : csList) { + long allocationId = cs.getAllocationId(); + ResourceRequest anyRequest = anyRequests.get(allocationId); if (cs.getHostname() != null) { + Map rackLocalRequestMap; + if (rackLocalRequests.containsKey(allocationId)) { + rackLocalRequestMap = rackLocalRequests.get(allocationId); + } else { + rackLocalRequestMap = new HashMap<>(); + rackLocalRequests.put(allocationId, rackLocalRequestMap); + } String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname()); // check rack local String rackname = "/" + rackHostNames[0]; @@ -419,34 +431,49 @@ public void untrackApp() { rackLocalRequestMap.get(rackname).getNumContainers() + 1); } else { ResourceRequest request = createResourceRequest(cs.getResource(), - cs.getExecutionType(), rackname, priority, 1); + cs.getExecutionType(), rackname, priority, + cs.getAllocationId(), 1); rackLocalRequestMap.put(rackname, request); } // check node local + Map nodeLocalRequestMap; + if (nodeLocalRequests.containsKey(allocationId)) { + nodeLocalRequestMap = nodeLocalRequests.get(allocationId); + } else { + nodeLocalRequestMap = new HashMap<>(); + nodeLocalRequests.put(allocationId, nodeLocalRequestMap); + } String hostname = rackHostNames[1]; if (nodeLocalRequestMap.containsKey(hostname)) { nodeLocalRequestMap.get(hostname).setNumContainers( nodeLocalRequestMap.get(hostname).getNumContainers() + 1); } else { ResourceRequest request = createResourceRequest(cs.getResource(), - cs.getExecutionType(), hostname, priority, 1); + cs.getExecutionType(), hostname, priority, + cs.getAllocationId(), 1); nodeLocalRequestMap.put(hostname, request); } } // any if (anyRequest == null) { anyRequest = createResourceRequest(cs.getResource(), - cs.getExecutionType(), ResourceRequest.ANY, priority, 1); + cs.getExecutionType(), ResourceRequest.ANY, priority, + cs.getAllocationId(), 1); + anyRequests.put(allocationId, anyRequest); } else { anyRequest.setNumContainers(anyRequest.getNumContainers() + 1); } } List ask = new ArrayList(); - ask.addAll(nodeLocalRequestMap.values()); - ask.addAll(rackLocalRequestMap.values()); - if (anyRequest != null) { - ask.add(anyRequest); + for (Map nodeLocalRequestMap : + nodeLocalRequests.values()) { + ask.addAll(nodeLocalRequestMap.values()); + } + for (Map rackLocalRequestMap : + rackLocalRequests.values()) { + ask.addAll(rackLocalRequestMap.values()); } + ask.addAll(anyRequests.values()); return ask; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 09f653f375f..fc6be73633f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -123,5 +123,6 @@ public static Resource getAMContainerResource(Configuration conf) { public static final String TASK_TYPE = TASK_CONTAINER + "type"; public static final String TASK_EXECUTION_TYPE = TASK_CONTAINER + "execution.type"; - + public static final String TASK_ALLOCATION_ID = TASK_CONTAINER + + "allocation.id"; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 6a8430ef416..d1a6245dd3f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -257,7 +257,7 @@ public void addNewContainer(Container container, long lifeTimeMS) { // normal container ContainerSimulator cs = new ContainerSimulator(container.getId(), container.getResource(), lifeTimeMS + System.currentTimeMillis(), - lifeTimeMS); + lifeTimeMS, container.getAllocationRequestId()); containerQueue.add(cs); runningContainers.put(cs.getId(), cs); } else { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java index 09498dacb11..06d81620f49 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java @@ -46,6 +46,8 @@ private String type; // execution type private ExecutionType executionType = ExecutionType.GUARANTEED; + // allocation id + private long allocationId; /** * invoked when AM schedules containers to allocate. @@ -61,23 +63,34 @@ public ContainerSimulator(Resource resource, long lifeTime, */ public ContainerSimulator(Resource resource, long lifeTime, String hostname, int priority, String type, ExecutionType executionType) { + this(resource, lifeTime, hostname, priority, type, executionType, -1); + } + + /** + * invoked when AM schedules containers to allocate. + */ + public ContainerSimulator(Resource resource, long lifeTime, + String hostname, int priority, String type, ExecutionType executionType, + long allocationId) { this.resource = resource; this.lifeTime = lifeTime; this.hostname = hostname; this.priority = priority; this.type = type; this.executionType = executionType; + this.allocationId = allocationId; } /** * invoke when NM schedules containers to run. */ public ContainerSimulator(ContainerId id, Resource resource, long endTime, - long lifeTime) { + long lifeTime, long allocationId) { this.id = id; this.resource = resource; this.endTime = endTime; this.lifeTime = lifeTime; + this.allocationId = allocationId; } public Resource getResource() { @@ -131,4 +144,8 @@ public void setPriority(int p) { public ExecutionType getExecutionType() { return executionType; } + + public long getAllocationId() { + return allocationId; + } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index cef41d62443..844fe174f82 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -21,7 +21,10 @@ import java.util.HashMap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.scheduler.*; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -189,6 +193,51 @@ public void testAMSimulatorWithNodeLabels() throws Exception { } } + @Test + public void testPackageRequests() { + MockAMSimulator app = new MockAMSimulator(); + List containerSimulators = new ArrayList<>(); + Resource resource = Resources.createResource(1024); + int priority = 1; + ExecutionType execType = ExecutionType.GUARANTEED; + String type = "map"; + + ContainerSimulator s1 = new ContainerSimulator(resource, 100, + "/default-rack/h1", priority, type, execType); + ContainerSimulator s2 = new ContainerSimulator(resource, 100, + "/default-rack/h1", priority, type, execType); + ContainerSimulator s3 = new ContainerSimulator(resource, 100, + "/default-rack/h2", priority, type, execType); + + containerSimulators.add(s1); + containerSimulators.add(s2); + containerSimulators.add(s3); + + List res = app.packageRequests(containerSimulators, + priority); + + // total 4 resource requests: any -> 1, rack -> 1, node -> 2 + Assert.assertEquals(4, res.size()); + + containerSimulators.clear(); + s1 = new ContainerSimulator(resource, 100, + "/default-rack/h1", priority, type, execType, 1); + s2 = new ContainerSimulator(resource, 100, + "/default-rack/h1", priority, type, execType, 2); + s3 = new ContainerSimulator(resource, 100, + "/default-rack/h2", priority, type, execType, 1); + + containerSimulators.add(s1); + containerSimulators.add(s2); + containerSimulators.add(s3); + + res = app.packageRequests(containerSimulators, priority); + + // total 7 resource requests: any -> 2, rack -> 2, node -> 3 + Assert.assertEquals(7, res.size()); + } + + @After public void tearDown() { if (rm != null) { diff --git a/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json b/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json index a485831186a..3d46b5ea6ec 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/inputsls.json @@ -12,7 +12,8 @@ "container.end.ms": 23707, "container.priority": 20, "container.type": "map", - "container.execution.type": "GUARANTEED" + "container.execution.type": "GUARANTEED", + "container.allocation.id": 1 }, { "container.host": "/default-rack/node3", @@ -20,7 +21,8 @@ "container.end.ms": 21593, "container.priority": 20, "container.type": "map", - "container.execution.type": "GUARANTEED" + "container.execution.type": "GUARANTEED", + "container.allocation.id": 2 }, { "container.host": "/default-rack/node2", @@ -28,7 +30,8 @@ "container.end.ms": 86613, "container.priority": 20, "container.type": "map", - "container.execution.type": "GUARANTEED" + "container.execution.type": "GUARANTEED", + "container.allocation.id": 2 } ] }