diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 9d35d1b..7cfa358 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -130,7 +130,7 @@ private SynthTraceJobProducer stjp; public SLSRunner() throws ClassNotFoundException { - Configuration tempConf = new Configuration(false); + Configuration tempConf = new SLSConfiguration(false); init(tempConf); } @@ -197,7 +197,7 @@ public void start() throws IOException, ClassNotFoundException, YarnException, } private void startRM() throws ClassNotFoundException, YarnException { - Configuration rmConf = new YarnConfiguration(getConf()); + Configuration rmConf = new YarnConfiguration(); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); if (Class.forName(schedulerClass) == CapacityScheduler.class) { @@ -400,7 +400,7 @@ private void createAMForJob(Map jsonJob) throws YarnException { // create a new AM String amType = jsonJob.get("am.type").toString(); runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - containerList, null); + containerList, null, getAMContainerResource(jsonJob)); } /** @@ -484,7 +484,8 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) // Only supports the default job type currently runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, null); + jobStartTimeMS, jobFinishTimeMS, containerList, null, + getAMContainerResource(null)); } private Resource getDefaultContainerResource() { @@ -602,7 +603,8 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { } runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, rr); + jobStartTimeMS, jobFinishTimeMS, containerList, rr, + getAMContainerResource(null)); } } finally { stjp.close(); @@ -610,6 +612,26 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { } + private Resource getAMContainerResource(Map jsonJob) { + SLSConfiguration slsConfiguration = (SLSConfiguration)getConf(); + Resource amContainerResource = slsConfiguration.getAMContainerResource(); + + if (jsonJob == null) { + return amContainerResource; + } + + if (jsonJob.containsKey("am.memory")) { + amContainerResource.setMemorySize( + Long.parseLong(jsonJob.get("am.memory").toString())); + } + + if (jsonJob.containsKey("am.vcores")) { + amContainerResource.setVirtualCores( + Integer.parseInt(jsonJob.get("am.vcores").toString())); + } + return amContainerResource; + } + private void increaseQueueAppNum(String queue) throws YarnException { SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); String queueName = wrapper.getRealQueueName(queue); @@ -626,7 +648,7 @@ private void increaseQueueAppNum(String queue) throws YarnException { private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List containerList, - ReservationSubmissionRequest rr) { + ReservationSubmissionRequest rr, Resource amContainerResource) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -636,9 +658,11 @@ private void runNewAM(String jobType, String user, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); boolean isTracked = trackedApps.contains(oldJobId); - amSim.init(AM_ID++, heartbeatInterval, containerList, - rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId, rr, runner.getStartTimeMS()); + AM_ID++; + + amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, + jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr, + runner.getStartTimeMS(), amContainerResource); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); @@ -800,7 +824,8 @@ public int run(final String[] argv) throws IOException, InterruptedException, } public static void main(String[] argv) throws Exception { - ToolRunner.run(new Configuration(), new SLSRunner(), argv); + SLSRunner slsRunner = new SLSRunner(); + ToolRunner.run(slsRunner.getConf(), slsRunner, argv); } static void printUsage() { diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 45a3c07..a93446d 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; -import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; import java.util.ArrayList; @@ -36,18 +35,13 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords - .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; - -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -55,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -67,14 +60,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.log4j.Logger; - import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.log4j.Logger; @Private @Unstable @@ -117,9 +108,7 @@ protected final Logger LOG = Logger.getLogger(AMSimulator.class); - // resource for AM container - private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; - private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + private Resource amContainerResource; private ReservationSubmissionRequest reservationRequest; @@ -128,11 +117,12 @@ public AMSimulator() { } @SuppressWarnings("checkstyle:parameternumber") - public void init(int id, int heartbeatInterval, + public void init(int heartbeatInterval, List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, String simQueue, boolean tracked, String oldApp, - ReservationSubmissionRequest rr, long baseTimeMS) { + ReservationSubmissionRequest rr, long baseTimeMS, + Resource amContainerResource) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -145,6 +135,7 @@ public void init(int id, int heartbeatInterval, this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; this.reservationRequest = rr; + this.amContainerResource = amContainerResource; } /** @@ -319,16 +310,13 @@ private void submitApp(ReservationId reservationId) appSubContext.setPriority(Priority.newInstance(0)); ContainerLaunchContext conLauContext = Records.newRecord(ContainerLaunchContext.class); - conLauContext.setApplicationACLs( - new HashMap()); - conLauContext.setCommands(new ArrayList()); - conLauContext.setEnvironment(new HashMap()); - conLauContext.setLocalResources(new HashMap()); - conLauContext.setServiceData(new HashMap()); + conLauContext.setApplicationACLs(new HashMap<>()); + conLauContext.setCommands(new ArrayList<>()); + conLauContext.setEnvironment(new HashMap<>()); + conLauContext.setLocalResources(new HashMap<>()); + conLauContext.setServiceData(new HashMap<>()); appSubContext.setAMContainerSpec(conLauContext); - appSubContext.setResource(Resources - .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, - MR_AM_CONTAINER_RESOURCE_VCORES)); + appSubContext.setResource(amContainerResource); if(reservationId != null) { appSubContext.setReservationID(reservationId); diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index de6d19d..8e39967 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -114,14 +115,14 @@ scheduled when all maps have finished (not support slow-start currently). public final Logger LOG = Logger.getLogger(MRAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int id, int heartbeatInterval, + public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, - long baselineStartTimeMS) { - super.init(id, heartbeatInterval, containerList, rm, se, - traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId, rr, baselineStartTimeMS); + long baselineStartTimeMS, Resource amContainerResource) { + super.init(heartbeatInterval, containerList, rm, se, + traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, + rr, baselineStartTimeMS, amContainerResource); amtype = "mapreduce"; // get map/reduce tasks diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java index 8fd5b3f..47a0cd9 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java @@ -20,10 +20,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; @Private @Unstable -public class SLSConfiguration { +public class SLSConfiguration extends Configuration { // sls public static final String PREFIX = "yarn.sls."; // runner @@ -62,6 +64,14 @@ public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; public static final String AM_TYPE = AM_PREFIX + "type."; + public static final String AM_CONTAINER_MEMORY = AM_PREFIX + + ".container.memory"; + public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024; + + public static final String AM_CONTAINER_VCORES = AM_PREFIX + + ".container.vcores"; + public static final int AM_CONTAINER_VCORES_DEFAULT = 1; + // container public static final String CONTAINER_PREFIX = PREFIX + "container."; public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX @@ -70,4 +80,13 @@ public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores"; public static final int CONTAINER_VCORES_DEFAULT = 1; + public SLSConfiguration(boolean loadDefaults) { + super(loadDefaults); + } + + public Resource getAMContainerResource() { + return Resource.newInstance( + getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT), + getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT)); + } } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java index b2bc8d5..0d60ea4 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java @@ -20,6 +20,7 @@ import net.jcip.annotations.NotThreadSafe; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -76,7 +77,7 @@ @Test(timeout = 60000) @SuppressWarnings("all") public void testSimulatorRunning() throws Exception { - Configuration conf = new Configuration(false); + Configuration conf = new SLSConfiguration(false); long timeTillShutdownInsec = 20L; runSLS(conf, timeTillShutdownInsec); } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 56aa219..0064ace 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -133,8 +133,8 @@ public void testAMSimulator() throws Exception { String appId = "app1"; String queue = "default"; List containers = new ArrayList<>(); - app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue, - true, appId, null, 0); + app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, + appId, null, 0, new SLSConfiguration(false).getAMContainerResource()); app.firstStep(); verifySchedulerMetrics(appId);