diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index ba43816..0d2c58b 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -78,7 +78,7 @@ private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Configuration conf; + private SLSConfiguration conf; private Map queueAppNumMap; // NM simulator @@ -126,7 +126,7 @@ public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, amClassMap = new HashMap<>(); // runner configuration - conf = new Configuration(false); + conf = new SLSConfiguration(false); conf.addResource("sls-runner.xml"); // runner int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, @@ -313,6 +313,7 @@ private void startAMFromSLSTraces(Resource containerResource, queueAppNumMap.get(queue) : 0; queueSize ++; queueAppNumMap.put(queue, queueSize); + // tasks List tasks = (List) jsonJob.get("job.tasks"); if (tasks == null || tasks.size() == 0) { @@ -355,9 +356,9 @@ private void startAMFromSLSTraces(Resource containerResource, AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(amType), new Configuration()); if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, + amSim.init(heartbeatInterval, containerList, rm, this, jobStartTime, jobFinishTime, user, queue, - isTracked, oldAppId); + isTracked, oldAppId, getAMContainerResource(jsonJob)); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTime); numTasks += containerList.size(); @@ -370,6 +371,21 @@ private void startAMFromSLSTraces(Resource containerResource, } } + private Resource getAMContainerResource(Map jsonJob) { + Resource amContainerResource = conf.getAMContainerResource(); + + if (jsonJob.containsKey("am.memory")) { + amContainerResource.setMemorySize( + Long.parseLong(jsonJob.get("am.memory").toString())); + } + + if (jsonJob.containsKey("am.vcores")) { + amContainerResource.setVirtualCores( + Integer.parseInt(jsonJob.get("am.vcores").toString())); + } + return amContainerResource; + } + /** * parse workload information from rumen trace files */ @@ -446,9 +462,9 @@ private void startAMFromRumenTraces(Resource containerResource, AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), conf); if (amSim != null) { - amSim.init(AM_ID ++, heartbeatInterval, containerList, + amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId); + isTracked, oldJobId, this.conf.getAMContainerResource()); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index a62f2b6..cf16ef0 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -114,17 +114,16 @@ protected final Logger LOG = Logger.getLogger(AMSimulator.class); // resource for AM container - private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; - private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + private Resource amContainerResource; public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } - public void init(int id, int heartbeatInterval, + public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, Resource amContainerResource) { super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = user; @@ -136,6 +135,7 @@ public void init(int id, int heartbeatInterval, this.isTracked = isTracked; this.traceStartTimeMS = traceStartTime; this.traceFinishTimeMS = traceFinishTime; + this.amContainerResource = amContainerResource; } /** @@ -281,16 +281,13 @@ private void submitApp() appSubContext.setPriority(Priority.newInstance(0)); ContainerLaunchContext conLauContext = Records.newRecord(ContainerLaunchContext.class); - conLauContext.setApplicationACLs( - new HashMap()); - conLauContext.setCommands(new ArrayList()); - conLauContext.setEnvironment(new HashMap()); - conLauContext.setLocalResources(new HashMap()); - conLauContext.setServiceData(new HashMap()); + conLauContext.setApplicationACLs(new HashMap<>()); + conLauContext.setCommands(new ArrayList<>()); + conLauContext.setEnvironment(new HashMap<>()); + conLauContext.setLocalResources(new HashMap<>()); + conLauContext.setServiceData(new HashMap<>()); appSubContext.setAMContainerSpec(conLauContext); - appSubContext.setResource(Resources - .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, - MR_AM_CONTAINER_RESOURCE_VCORES)); + appSubContext.setResource(amContainerResource); subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction() { diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index e726b09..d87b57a 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; -import org.apache.avro.Protocol; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; @@ -35,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -114,13 +113,13 @@ scheduled when all maps have finished (not support slow-start currently). public final Logger LOG = Logger.getLogger(MRAMSimulator.class); - public void init(int id, int heartbeatInterval, + public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { - super.init(id, heartbeatInterval, containerList, rm, se, + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, Resource amContainerResource) { + super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId); + isTracked, oldAppId, amContainerResource); amtype = "mapreduce"; // get map/reduce tasks diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 8fd5b3f..e9406b4 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -20,10 +20,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; @Private @Unstable -public class SLSConfiguration { +public class SLSConfiguration extends Configuration { // sls public static final String PREFIX = "yarn.sls."; // runner @@ -62,6 +64,14 @@ public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; public static final String AM_TYPE = AM_PREFIX + "type."; + public static final String AM_CONTAINER_MEMORY = AM_PREFIX + + ".container.memory"; + public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024; + + public static final String AM_CONTAINER_VCORES = AM_PREFIX + + ".container.vcores"; + public static final int AM_CONTAINER_VCORES_DEFAULT = 1; + // container public static final String CONTAINER_PREFIX = PREFIX + "container."; public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX @@ -70,4 +80,14 @@ public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + + public SLSConfiguration(boolean loadDefaults) { + super(loadDefaults); + } + + public Resource getAMContainerResource() { + return Resource.newInstance( + getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), + getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT)); + } } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index ca3d195..590a4ef 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -133,8 +133,8 @@ public void testAMSimulator() throws Exception { String appId = "app1"; String queue = "default"; List containers = new ArrayList<>(); - app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue, - true, appId); + app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, + true, appId, new SLSConfiguration(false).getAMContainerResource()); app.firstStep(); verifySchedulerMetrics(appId);