diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 81d6a19527b..6c2651ac331 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -140,6 +140,7 @@
src/test/resources/nodes-with-resources.jsonsrc/test/resources/exit-invariants.txtsrc/test/resources/ongoing-invariants.txt
+ src/test/resources/sls_dag.json
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 3257915b90f..6ed28d90d8c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -563,10 +563,18 @@ private void createAMForJob(Map jsonJob) throws YarnException {
allocationId = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
}
+
+ long requestDelay = 0;
+ if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) {
+ requestDelay = Long.parseLong(
+ jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString());
+ }
+ requestDelay = Math.max(requestDelay, 0);
+
for (int i = 0; i < count; i++) {
containers.add(
new ContainerSimulator(res, duration, hostname, priority, type,
- executionType, allocationId));
+ executionType, allocationId, requestDelay));
}
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
new file mode 100644
index 00000000000..7e2d0a76294
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AMSimulator that simulates DAG - it requests for containers
+ * based on the delay specified. It finishes when all the tasks
+ * are completed.
+ *
+ * Vocabulary Used:
+ * pending -> requests which are NOT yet sent to RM.
+ * scheduled -> requests which are sent to RM but not yet assigned.
+ * assigned -> requests which are assigned to a container.
+ * completed -> request corresponding to which container has completed.
+ * Containers are requested based on the request delay.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DAGAMSimulator extends AMSimulator {
+
+ private static final int PRIORITY = 20;
+
+ private List pendingContainers =
+ new LinkedList<>();
+
+ private List scheduledContainers =
+ new LinkedList<>();
+
+ private Map assignedContainers =
+ new HashMap<>();
+
+ private List completedContainers =
+ new LinkedList<>();
+
+ private List allContainers =
+ new LinkedList<>();
+
+ private boolean isFinished = false;
+
+ private long amStartTime;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DAGAMSimulator.class);
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ public void init(int heartbeatInterval,
+ List containerList, ResourceManager resourceManager,
+ SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
+ String simQueue, boolean tracked, String oldApp, long baseTimeMS,
+ Resource amResource, String nodeLabelExpr, Map params,
+ Map appIdAMSim) {
+ super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
+ startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
+ amResource, nodeLabelExpr, params, appIdAMSim);
+ super.amtype = "dag";
+
+ allContainers.addAll(containerList);
+ pendingContainers.addAll(containerList);
+ totalContainers = allContainers.size();
+
+ LOG.info("Added new job with {} containers", allContainers.size());
+ }
+
+ @Override
+ public void firstStep() throws Exception {
+ super.firstStep();
+ amStartTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void initReservation(ReservationId reservationId,
+ long deadline, long now) {
+ // DAG AM doesn't support reservation
+ setReservationRequest(null);
+ }
+
+ @Override
+ public synchronized void notifyAMContainerLaunched(Container masterContainer)
+ throws Exception {
+ if (null != masterContainer) {
+ restart();
+ super.notifyAMContainerLaunched(masterContainer);
+ }
+ }
+
+ protected void processResponseQueue() throws Exception {
+ while (!responseQueue.isEmpty()) {
+ AllocateResponse response = responseQueue.take();
+
+ // check completed containers
+ if (!response.getCompletedContainersStatuses().isEmpty()) {
+ for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+ ContainerId containerId = cs.getContainerId();
+ if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
+ if (assignedContainers.containsKey(containerId)) {
+ LOG.debug("Application {} has one container finished ({}).",
+ appId, containerId);
+ ContainerSimulator containerSimulator =
+ assignedContainers.remove(containerId);
+ finishedContainers++;
+ completedContainers.add(containerSimulator);
+ } else if (amContainer.getId().equals(containerId)) {
+ // am container released event
+ isFinished = true;
+ LOG.info("Application {} goes to finish.", appId);
+ }
+ if (finishedContainers >= totalContainers) {
+ lastStep();
+ }
+ } else {
+ // container to be killed
+ if (assignedContainers.containsKey(containerId)) {
+ LOG.error("Application {} has one container killed ({}).", appId,
+ containerId);
+ pendingContainers.add(assignedContainers.remove(containerId));
+ } else if (amContainer.getId().equals(containerId)) {
+ LOG.error("Application {}'s AM is "
+ + "going to be killed. Waiting for rescheduling...", appId);
+ }
+ }
+ }
+ }
+
+ // check finished
+ if (isAMContainerRunning &&
+ (finishedContainers >= totalContainers)) {
+ isAMContainerRunning = false;
+ LOG.info("Application {} sends out event to clean up"
+ + " its AM container.", appId);
+ isFinished = true;
+ break;
+ }
+
+ // check allocated containers
+ for (Container container : response.getAllocatedContainers()) {
+ if (!scheduledContainers.isEmpty()) {
+ ContainerSimulator cs = scheduledContainers.remove(0);
+ LOG.debug("Application {} starts to launch a container ({}).",
+ appId, container.getId());
+ assignedContainers.put(container.getId(), cs);
+ se.getNmMap().get(container.getNodeId())
+ .addNewContainer(container, cs.getLifeTime());
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void sendContainerRequest() throws Exception {
+ if (isFinished) {
+ return;
+ }
+ // send out request
+ List ask = null;
+ if (finishedContainers != totalContainers) {
+ if (!pendingContainers.isEmpty()) {
+ List toBeScheduled =
+ getToBeScheduledContainers(pendingContainers, amStartTime);
+ if (toBeScheduled.size() > 0) {
+ ask = packageRequests(toBeScheduled, PRIORITY);
+ LOG.info("Application {} sends out request for {} containers.",
+ appId, toBeScheduled.size());
+ scheduledContainers.addAll(toBeScheduled);
+ pendingContainers.removeAll(toBeScheduled);
+ toBeScheduled.clear();
+ }
+ }
+ }
+
+ if (ask == null) {
+ ask = new ArrayList<>();
+ }
+
+ final AllocateRequest request = createAllocateRequest(ask);
+ if (totalContainers == 0) {
+ request.setProgress(1.0f);
+ } else {
+ request.setProgress((float) finishedContainers / totalContainers);
+ }
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ Token token = rm.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ AllocateResponse response = ugi.doAs(
+ (PrivilegedExceptionAction) () -> rm
+ .getApplicationMasterService().allocate(request));
+ if (response != null) {
+ responseQueue.put(response);
+ }
+ }
+
+ @VisibleForTesting
+ public List getToBeScheduledContainers(
+ List containers, long startTime) {
+ List toBeScheduled = new LinkedList<>();
+ for (ContainerSimulator cs : containers) {
+ // only request for the container if it is time to request
+ if (cs.getRequestDelay() + startTime <=
+ System.currentTimeMillis()) {
+ toBeScheduled.add(cs);
+ }
+ }
+ return toBeScheduled;
+ }
+
+ @Override
+ protected void checkStop() {
+ if (isFinished) {
+ super.setEndTime(System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ public void lastStep() throws Exception {
+ super.lastStep();
+
+ //clear data structures.
+ allContainers.clear();
+ pendingContainers.clear();
+ scheduledContainers.clear();
+ assignedContainers.clear();
+ completedContainers.clear();
+ }
+
+ /**
+ * restart running because of the am container killed.
+ */
+ private void restart() {
+ isFinished = false;
+ pendingContainers.clear();
+ pendingContainers.addAll(allContainers);
+ pendingContainers.removeAll(completedContainers);
+ amContainer = null;
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
index fc6be73633f..34b89b656a3 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
@@ -125,4 +125,7 @@ public static Resource getAMContainerResource(Configuration conf) {
+ "execution.type";
public static final String TASK_ALLOCATION_ID = TASK_CONTAINER
+ "allocation.id";
+ public static final String TASK_REQUEST_DELAY = TASK_CONTAINER
+ + "request.delay";
+
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
index 06d81620f49..e83ee91d8e1 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java
@@ -38,6 +38,8 @@
private long endTime;
// life time (ms)
private long lifeTime;
+ // time(ms) after which container would be requested by AM
+ private long requestDelay;
// host name
private String hostname;
// priority
@@ -63,21 +65,24 @@ public ContainerSimulator(Resource resource, long lifeTime,
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType) {
- this(resource, lifeTime, hostname, priority, type, executionType, -1);
+ this(resource, lifeTime, hostname, priority, type,
+ executionType, -1, 0);
}
/**
* invoked when AM schedules containers to allocate.
*/
+ @SuppressWarnings("checkstyle:parameternumber")
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType,
- long allocationId) {
+ long allocationId, long requestDelay) {
this.resource = resource;
this.lifeTime = lifeTime;
this.hostname = hostname;
this.priority = priority;
this.type = type;
this.executionType = executionType;
+ this.requestDelay = requestDelay;
this.allocationId = allocationId;
}
@@ -148,4 +153,8 @@ public ExecutionType getExecutionType() {
public long getAllocationId() {
return allocationId;
}
+
+ public long getRequestDelay() {
+ return requestDelay;
+ }
}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
new file mode 100644
index 00000000000..38ea134da5f
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import org.apache.hadoop.yarn.sls.appmaster.DAGAMSimulator;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for DagAMSimulator.
+ */
+public class TestDagAMSimulator {
+
+ /**
+ * Test to check whether containers are scheduled based on request delay.
+ * @throws Exception
+ */
+ @Test
+ public void testGetToBeScheduledContainers() throws Exception {
+ DAGAMSimulator dagamSimulator = new DAGAMSimulator();
+ List containerSimulators = new ArrayList<>();
+
+ // containers are requested with 0, 1000, 1500 and 4000ms delay.
+ containerSimulators.add(createContainerSim(1, 0));
+ containerSimulators.add(createContainerSim(2, 1000));
+ containerSimulators.add(createContainerSim(3, 1500));
+ containerSimulators.add(createContainerSim(4, 4000));
+
+ long startTime = System.currentTimeMillis();
+ List res = dagamSimulator.getToBeScheduledContainers(
+ containerSimulators, startTime);
+ // we should get only one container with request delay set to 0
+ assertEquals(1, res.size());
+ assertEquals(1, res.get(0).getAllocationId());
+
+ startTime -= 1000;
+ res = dagamSimulator.getToBeScheduledContainers(
+ containerSimulators, startTime);
+ // we should get containers with request delay set < 1000
+ assertEquals(2, res.size());
+ assertEquals(1, res.get(0).getAllocationId());
+ assertEquals(2, res.get(1).getAllocationId());
+
+ startTime -= 2000;
+ res = dagamSimulator.getToBeScheduledContainers(
+ containerSimulators, startTime);
+ // we should get containers with request delay set < 2000
+ assertEquals(3, res.size());
+ assertEquals(1, res.get(0).getAllocationId());
+ assertEquals(2, res.get(1).getAllocationId());
+ assertEquals(3, res.get(2).getAllocationId());
+ }
+
+ private ContainerSimulator createContainerSim(long allocationId,
+ long requestDelay) {
+ return new ContainerSimulator(null, 1000, "*", 1, "Map",
+ null, allocationId, requestDelay);
+ }
+}
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java
new file mode 100644
index 00000000000..54158c0083c
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSDagAMSimulator.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSDagAMSimulator extends BaseSLSRunnerTest {
+
+ @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+ public static Collection