diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java new file mode 100644 index 00000000000..d7b47bbccc6 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobTraceReader; +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.util.UTCClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +public class AMRunner { + private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class); + static int REMAINING_APPS = 0; + + private final Configuration conf; + private int AM_ID; + private Map amMap; + private Map appIdAMSim; + private Set trackedApps; + private Map amClassMap; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + private TaskRunner runner; + private SLSRunner slsRunner; + private int numAMs, numTasks; + private long maxRuntime; + private ResourceManager rm; + + public AMRunner(TaskRunner runner, SLSRunner slsRunner, + SynthTraceJobProducer stjp) { + this.runner = runner; + this.slsRunner = slsRunner; + this.conf = slsRunner.getConf(); + this.stjp = stjp; + } + + + public void init(Configuration conf) throws ClassNotFoundException { + amMap = new ConcurrentHashMap<>(); + amClassMap = new HashMap<>(); + appIdAMSim = new ConcurrentHashMap<>(); + // map + for (Map.Entry e : conf) { + String key = e.getKey().toString(); + if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { + String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); + amClassMap.put(amType, Class.forName(conf.get(key))); + } + } + } + + public void startAM() throws YarnException, IOException { + switch (inputType) { + case SLS: + for (String inputTrace : inputTraces) { + startAMFromSLSTrace(inputTrace); + } + break; + case RUMEN: + long baselineTimeMS = 0; + for (String inputTrace : inputTraces) { + startAMFromRumenTrace(inputTrace, baselineTimeMS); + } + break; + case SYNTH: + startAMFromSynthGenerator(); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + + numAMs = amMap.size(); + REMAINING_APPS = numAMs; + } + + /** + * Parse workload from a SLS trace file. + */ + private void startAMFromSLSTrace(String inputTrace) throws IOException { + JsonFactory jsonF = new JsonFactory(); + ObjectMapper mapper = new ObjectMapper(); + + try (Reader input = new InputStreamReader( + new FileInputStream(inputTrace), "UTF-8")) { + Iterator jobIter = mapper.readValues( + jsonF.createParser(input), Map.class); + + while (jobIter.hasNext()) { + try { + Map jsonJob = jobIter.next(); + AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, slsRunner); + startAMs(amDef); + } catch (Exception e) { + LOG.error("Failed to create an AM: {}", e.getMessage()); + } + } + } + } + + /** + * parse workload information from synth-generator trace files. + */ + private void startAMFromSynthGenerator() throws YarnException, IOException { + Configuration localConf = new Configuration(); + localConf.set("fs.defaultFS", "file:///"); + + SynthJob job; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + ReservationId reservationId = null; + if (job.hasDeadline()) { + reservationId = ReservationId + .newInstance(rm.getStartTime(), AM_ID); + } + AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, slsRunner); + startAMs(amDef, reservationId, job.getParams(), job.getDeadline()); + } + } + + /** + * Parse workload from a rumen trace file. + */ + private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) + throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + File fin = new File(inputTrace); + + try (JobTraceReader reader = new JobTraceReader( + new Path(fin.getAbsolutePath()), conf)) { + LoggedJob job = reader.getNext(); + + while (job != null) { + try { + AMDefinitionRumen amDef = + AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS, + slsRunner); + startAMs(amDef); + } catch (Exception e) { + LOG.error("Failed to create an AM", e); + } + job = reader.getNext(); + } + } + } + + private void startAMs(AMDefinition amDef) { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withDeadline(-1) + .withReservationId(null) + .withParams(null) + .build(); + runNewAM(jobDef); + } + } + + private void startAMs(AMDefinition amDef, + ReservationId reservationId, + Map params, long deadline) { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withReservationId(reservationId) + .withParams(params) + .withDeadline(deadline) + .build(); + runNewAM(jobDef); + } + } + + private void runNewAM(JobDefinition jobDef) { + AMDefinition amDef = jobDef.getAmDefinition(); + String oldJobId = amDef.getOldAppId(); + AMSimulator amSim = + createAmSimulator(amDef.getAmType()); + + if (amSim != null) { + int heartbeatInterval = conf.getInt( + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); + boolean isTracked = trackedApps.contains(oldJobId); + + if (oldJobId == null) { + oldJobId = Integer.toString(AM_ID); + } + AM_ID++; + amSim.init(amDef, rm, slsRunner, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim); + if (jobDef.getReservationId() != null) { + // if we have a ReservationId, delegate reservation creation to + // AMSim (reservation shape is impl specific) + UTCClock clock = new UTCClock(); + amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), clock.getTime()); + } + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime()); + numTasks += amDef.getTaskContainers().size(); + amMap.put(oldJobId, amSim); + } + } + + private AMSimulator createAmSimulator(String jobType) { + return (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); + } + + public AMSimulator getAMSimulator(ApplicationId appId) { + return appIdAMSim.get(appId); + } + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public void setResourceManager(ResourceManager rm) { + this.rm = rm; + } + + public Set getTrackedApps() { + return trackedApps; + } + + public void setTrackedApps(Set trackApps) { + this.trackedApps = trackApps; + } + + public int getNumAMs() { + return numAMs; + } + + public int getNumTasks() { + return numTasks; + } + + public long getMaxRuntime() { + return maxRuntime; + } + + public Map getAmMap() { + return amMap; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java new file mode 100644 index 00000000000..add10e67905 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class NMRunner { + private static final Logger LOG = LoggerFactory.getLogger(NMRunner.class); + + // other simulation information + private int numNMs, numRacks; + + // NM simulator + private Map nmMap; + private Resource nodeManagerResource; + private String nodeFile; + private TaskRunner taskRunner; + private Configuration conf; + private ResourceManager rm; + private String tableMapping; + private int thredPoolSize; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + + public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, String tableMapping, int threadPoolSize) { + this.taskRunner = taskRunner; + this.conf = conf; + this.rm = rm; + this.tableMapping = tableMapping; + this.thredPoolSize = threadPoolSize; + this.nmMap = new ConcurrentHashMap<>(); + this.nodeManagerResource = getNodeManagerResourceFromConf(); + } + + public void startNM() throws YarnException, IOException, + InterruptedException { + // nm configuration + int heartbeatInterval = conf.getInt( + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + float resourceUtilizationRatio = conf.getFloat( + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); + // nm information (fetch from topology file, or from sls/rumen json file) + Set nodeSet = null; + if (nodeFile.isEmpty()) { + for (String inputTrace : inputTraces) { + switch (inputType) { + case SLS: + nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); + break; + case RUMEN: + nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack()); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + } + } else { + nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, + nodeManagerResource); + } + + if (nodeSet == null || nodeSet.isEmpty()) { + throw new YarnException("No node! Please configure nodes."); + } + + SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); + + // create NM simulators + Random random = new Random(); + Set rackSet = ConcurrentHashMap.newKeySet(); + int threadPoolSize = Math.max(thredPoolSize, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + ExecutorService executorService = Executors. + newFixedThreadPool(threadPoolSize); + for (SLSRunner.NodeDetails nodeDetails : nodeSet) { + executorService.submit(new Runnable() { + @Override public void run() { + try { + // we randomize the heartbeat start time from zero to 1 interval + NMSimulator nm = new NMSimulator(); + Resource nmResource = nodeManagerResource; + String hostName = nodeDetails.getHostname(); + if (nodeDetails.getNodeResource() != null) { + nmResource = nodeDetails.getNodeResource(); + } + Set nodeLabels = nodeDetails.getLabels(); + nm.init(hostName, nmResource, + random.nextInt(heartbeatInterval), + heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); + nmMap.put(nm.getNode().getNodeID(), nm); + taskRunner.schedule(nm); + rackSet.add(nm.getNode().getRackName()); + } catch (IOException | YarnException e) { + LOG.error("Got an error while adding node", e); + } + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + numRacks = rackSet.size(); + numNMs = nmMap.size(); + } + + void waitForNodesRunning() throws InterruptedException { + long startTimeMS = System.currentTimeMillis(); + while (true) { + int numRunningNodes = 0; + for (RMNode node : rm.getRMContext().getRMNodes().values()) { + if (node.getState() == NodeState.RUNNING) { + numRunningNodes++; + } + } + if (numRunningNodes == numNMs) { + break; + } + LOG.info("SLSRunner is waiting for all nodes RUNNING." + + " {} of {} NMs initialized.", numRunningNodes, numNMs); + Thread.sleep(1000); + } + LOG.info("SLSRunner takes {} ms to launch all nodes.", + System.currentTimeMillis() - startTimeMS); + } + + private Resource getNodeManagerResourceFromConf() { + Resource resource = Resources.createResource(0); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + long value; + if (info.getName().equals(ResourceInformation.MEMORY_URI)) { + value = conf.getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { + value = conf.getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + } else { + value = conf.getLong(SLSConfiguration.NM_PREFIX + + info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + } + + resource.setResourceValue(info.getName(), value); + } + + return resource; + } + + public void setNodeFile(String nodeFile) { + this.nodeFile = nodeFile; + } + + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public int getNumNMs() { + return numNMs; + } + + public int getNumRacks() { + return numRacks; + } + + public Resource getNodeManagerResource() { + return nodeManagerResource; + } + + public Map getNmMap() { + return nmMap; + } + + public SynthTraceJobProducer getStjp() { + return stjp; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java new file mode 100644 index 00000000000..dbded4b306e --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.TableMapping; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; +import java.util.HashMap; +import java.util.Map; + +public class RMRunner { + private ResourceManager rm; + private String metricsOutputDir; + private Configuration conf; + private SLSRunner slsRunner; + private String tableMapping; + private Map queueAppNumMap; + + public RMRunner(Configuration conf, SLSRunner slsRunner) { + this.conf = conf; + this.slsRunner = slsRunner; + this.queueAppNumMap = new HashMap<>(); + } + + public void startRM() throws ClassNotFoundException, YarnException { + Configuration rmConf = new YarnConfiguration(conf); + String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); + + if (Class.forName(schedulerClass) == CapacityScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSCapacityScheduler.class.getName()); + rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getName()); + } else if (Class.forName(schedulerClass) == FairScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSFairScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FifoScheduler.class) { + // TODO add support for FifoScheduler + throw new YarnException("Fifo Scheduler is not supported yet."); + } + rmConf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + TableMapping.class, DNSToSwitchMapping.class); + rmConf.set( + CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, + tableMapping); + rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); + + rm = new ResourceManager() { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new MockAMLauncher(slsRunner, this.rmContext); + } + }; + + // Across runs of parametrized tests, the JvmMetrics objects is retained, + // but is not registered correctly + JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); + jvmMetrics.registerIfNeeded(); + + // Init and start the actual ResourceManager + rm.init(rmConf); + rm.start(); + } + + public void increaseQueueAppNum(String queue) throws YarnException { + SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); + String queueName = wrapper.getRealQueueName(queue); + Integer appNum = queueAppNumMap.get(queueName); + if (appNum == null) { + appNum = 1; + } else { + appNum = appNum + 1; + } + + queueAppNumMap.put(queueName, appNum); + SchedulerMetrics metrics = wrapper.getSchedulerMetrics(); + if (metrics != null) { + metrics.trackQueue(queueName); + } + } + + public void setMetricsOutputDir(String metricsOutputDir) { + this.metricsOutputDir = metricsOutputDir; + } + + public String getTableMapping() { + return tableMapping; + } + + public void setTableMapping(String tableMapping) { + this.tableMapping = tableMapping; + } + + public void stop() { + rm.stop(); + } + + public ResourceManager getRm() { + return rm; + } + + public Map getQueueAppNumMap() { + return queueAppNumMap; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 83834e8f9c9..8c5a747cf8c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -17,28 +17,6 @@ */ package org.apache.hadoop.yarn.sls; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.security.Security; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -50,89 +28,57 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.metrics2.source.JvmMetrics; -import org.apache.hadoop.net.DNSToSwitchMapping; -import org.apache.hadoop.net.TableMapping; -import org.apache.hadoop.tools.rumen.JobTraceReader; -import org.apache.hadoop.tools.rumen.LoggedJob; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; -import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; -import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; -import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; -import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; -import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.Tracker; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; -import org.apache.hadoop.yarn.util.UTCClock; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.security.Security; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + @Private @Unstable public class SLSRunner extends Configured implements Tool { - // RM, Runner - private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Map queueAppNumMap; - private int poolSize; - - // NM simulator - private Map nmMap; - private Resource nodeManagerResource; - private String nodeFile; - - // AM simulator - private int AM_ID; - private Map amMap; - private Map appIdAMSim; - private Set trackedApps; - private Map amClassMap; - private static int remainingApps = 0; // metrics - private String metricsOutputDir; private boolean printSimulation; - // other simulation information - private int numNMs, numRacks, numAMs, numTasks; - private long maxRuntime; - private String tableMapping; - private final static Map simulateInfoMap = - new HashMap(); + new HashMap<>(); // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); private static boolean exitAtTheFinish = false; + private AMRunner amRunner; + private RMRunner rmRunner; + private NMRunner nmRunner; + + private SynthTraceJobProducer stjp; /** * The type of trace in input. @@ -145,19 +91,16 @@ public static final String NETWORK_NEGATIVE_CACHE_TTL = "networkaddress.cache.negative.ttl"; - private TraceType inputType; - private SynthTraceJobProducer stjp; - public static int getRemainingApps() { - return remainingApps; + return AMRunner.REMAINING_APPS; } - public SLSRunner() throws ClassNotFoundException { + public SLSRunner() throws ClassNotFoundException, YarnException { Configuration tempConf = new Configuration(false); init(tempConf); } - public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + public SLSRunner(Configuration tempConf) throws ClassNotFoundException, YarnException { init(tempConf); } @@ -171,51 +114,33 @@ public void setConf(Configuration conf) { super.setConf(conf); } - private void init(Configuration tempConf) throws ClassNotFoundException { - nmMap = new ConcurrentHashMap<>(); - queueAppNumMap = new HashMap<>(); - amMap = new ConcurrentHashMap<>(); - amClassMap = new HashMap<>(); - appIdAMSim = new ConcurrentHashMap<>(); + private void init(Configuration tempConf) throws ClassNotFoundException, YarnException { // runner configuration setConf(tempConf); - - // runner - poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + + //taskrunner + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); - // map - for (Map.Entry e : tempConf) { - String key = e.getKey().toString(); - if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { - String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); - amClassMap.put(amType, Class.forName(tempConf.get(key))); - } - } - nodeManagerResource = getNodeManagerResource(); + rmRunner = new RMRunner(getConf(), this); + nmRunner = new NMRunner(runner, getConf(), rmRunner.getRm(), rmRunner.getTableMapping(), poolSize); + stjp = getSynthJobTraceProducer(); + amRunner = new AMRunner(runner, this, stjp); + amRunner.init(tempConf); } - private Resource getNodeManagerResource() { - Resource resource = Resources.createResource(0); - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - long value; - if (info.getName().equals(ResourceInformation.MEMORY_URI)) { - value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { - value = getConf().getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - } else { - value = getConf().getLong(SLSConfiguration.NM_PREFIX + - info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + private SynthTraceJobProducer getSynthJobTraceProducer() throws YarnException { + // if we use the nodeFile this could have been not initialized yet. + if (nmRunner.getStjp() != null) { + return nmRunner.getStjp(); + } else { + try { + return new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } catch (IOException e) { + throw new YarnException("Failed to initialize SynthTraceJobProducer", e); } - - resource.setResourceValue(info.getName(), value); } - - return resource; } /** @@ -225,17 +150,29 @@ private Resource getNodeManagerResource() { return Collections.unmodifiableMap(simulateInfoMap); } + /** + * This is invoked before start. + * @param inType + * @param inTraces + * @param nodes + * @param metricsOutputDir + * @param trackApps + * @param printsimulation + */ public void setSimulationParams(TraceType inType, String[] inTraces, - String nodes, String outDir, Set trackApps, - boolean printsimulation) throws IOException, ClassNotFoundException { + String nodes, String metricsOutputDir, Set trackApps, + boolean printsimulation) { - this.inputType = inType; this.inputTraces = inTraces.clone(); - this.nodeFile = nodes; - this.trackedApps = trackApps; + this.amRunner.setInputType(inType); + this.amRunner.setInputTraces(this.inputTraces); + this.amRunner.setTrackedApps(trackApps); + this.nmRunner.setNodeFile(nodes); + this.nmRunner.setInputType(inType); + this.nmRunner.setInputTraces(this.inputTraces); this.printSimulation = printsimulation; - metricsOutputDir = outDir; - tableMapping = outDir + "/tableMapping.csv"; + this.rmRunner.setMetricsOutputDir(metricsOutputDir); + this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv"); } public void start() throws IOException, ClassNotFoundException, YarnException, @@ -244,20 +181,24 @@ public void start() throws IOException, ClassNotFoundException, YarnException, enableDNSCaching(getConf()); // start resource manager - startRM(); + rmRunner.startRM(); + amRunner.setResourceManager(rmRunner.getRm()); + // start node managers - startNM(); + nmRunner.startNM(); // start application masters - startAM(); + amRunner.startAM(); + // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(this.trackedApps); + SchedulerWrapper resourceScheduler = + (SchedulerWrapper) rmRunner.getRm().getResourceScheduler(); + Tracker tracker = resourceScheduler.getTracker(); + tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet()); + tracker.setTrackedAppSet(amRunner.getTrackedApps()); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING - waitForNodesRunning(); + nmRunner.waitForNodesRunning(); // starting the runner once everything is ready to go, runner.start(); } @@ -279,252 +220,6 @@ static void enableDNSCaching(Configuration conf) { } } - private void startRM() throws ClassNotFoundException, YarnException { - Configuration rmConf = new YarnConfiguration(getConf()); - String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); - - if (Class.forName(schedulerClass) == CapacityScheduler.class) { - rmConf.set(YarnConfiguration.RM_SCHEDULER, - SLSCapacityScheduler.class.getName()); - rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); - rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - ProportionalCapacityPreemptionPolicy.class.getName()); - } else if (Class.forName(schedulerClass) == FairScheduler.class) { - rmConf.set(YarnConfiguration.RM_SCHEDULER, - SLSFairScheduler.class.getName()); - } else if (Class.forName(schedulerClass) == FifoScheduler.class) { - // TODO add support for FifoScheduler - throw new YarnException("Fifo Scheduler is not supported yet."); - } - rmConf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - TableMapping.class, DNSToSwitchMapping.class); - rmConf.set( - CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, - tableMapping); - rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); - - final SLSRunner se = this; - rm = new ResourceManager() { - @Override - protected ApplicationMasterLauncher createAMLauncher() { - return new MockAMLauncher(se, this.rmContext, appIdAMSim); - } - }; - - // Across runs of parametrized tests, the JvmMetrics objects is retained, - // but is not registered correctly - JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); - jvmMetrics.registerIfNeeded(); - - // Init and start the actual ResourceManager - rm.init(rmConf); - rm.start(); - } - - private void startNM() throws YarnException, IOException, - InterruptedException { - // nm configuration - int heartbeatInterval = getConf().getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); - float resourceUtilizationRatio = getConf().getFloat( - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); - // nm information (fetch from topology file, or from sls/rumen json file) - Set nodeSet = null; - if (nodeFile.isEmpty()) { - for (String inputTrace : inputTraces) { - switch (inputType) { - case SLS: - nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); - break; - case RUMEN: - nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); - break; - case SYNTH: - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), - stjp.getNumNodes()/stjp.getNodesPerRack()); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - } - } else { - nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, - nodeManagerResource); - } - - if (nodeSet == null || nodeSet.isEmpty()) { - throw new YarnException("No node! Please configure nodes."); - } - - SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); - - // create NM simulators - Random random = new Random(); - Set rackSet = ConcurrentHashMap.newKeySet(); - int threadPoolSize = Math.max(poolSize, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); - ExecutorService executorService = Executors. - newFixedThreadPool(threadPoolSize); - for (NodeDetails nodeDetails : nodeSet) { - executorService.submit(new Runnable() { - @Override public void run() { - try { - // we randomize the heartbeat start time from zero to 1 interval - NMSimulator nm = new NMSimulator(); - Resource nmResource = nodeManagerResource; - String hostName = nodeDetails.getHostname(); - if (nodeDetails.getNodeResource() != null) { - nmResource = nodeDetails.getNodeResource(); - } - Set nodeLabels = nodeDetails.getLabels(); - nm.init(hostName, nmResource, - random.nextInt(heartbeatInterval), - heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); - nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); - rackSet.add(nm.getNode().getRackName()); - } catch (IOException | YarnException e) { - LOG.error("Got an error while adding node", e); - } - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.MINUTES); - numRacks = rackSet.size(); - numNMs = nmMap.size(); - } - - private void waitForNodesRunning() throws InterruptedException { - long startTimeMS = System.currentTimeMillis(); - while (true) { - int numRunningNodes = 0; - for (RMNode node : rm.getRMContext().getRMNodes().values()) { - if (node.getState() == NodeState.RUNNING) { - numRunningNodes++; - } - } - if (numRunningNodes == numNMs) { - break; - } - LOG.info("SLSRunner is waiting for all nodes RUNNING." - + " {} of {} NMs initialized.", numRunningNodes, numNMs); - Thread.sleep(1000); - } - LOG.info("SLSRunner takes {} ms to launch all nodes.", - System.currentTimeMillis() - startTimeMS); - } - - @SuppressWarnings("unchecked") - private void startAM() throws YarnException, IOException { - switch (inputType) { - case SLS: - for (String inputTrace : inputTraces) { - startAMFromSLSTrace(inputTrace); - } - break; - case RUMEN: - long baselineTimeMS = 0; - for (String inputTrace : inputTraces) { - startAMFromRumenTrace(inputTrace, baselineTimeMS); - } - break; - case SYNTH: - startAMFromSynthGenerator(); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - - numAMs = amMap.size(); - remainingApps = numAMs; - } - - /** - * Parse workload from a SLS trace file. - */ - @SuppressWarnings("unchecked") - private void startAMFromSLSTrace(String inputTrace) throws IOException { - JsonFactory jsonF = new JsonFactory(); - ObjectMapper mapper = new ObjectMapper(); - - try (Reader input = new InputStreamReader( - new FileInputStream(inputTrace), "UTF-8")) { - Iterator jobIter = mapper.readValues( - jsonF.createParser(input), Map.class); - - while (jobIter.hasNext()) { - try { - Map jsonJob = jobIter.next(); - AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace( - jsonJob, this); - startAMs(amDef); - } catch (Exception e) { - LOG.error("Failed to create an AM: {}", e.getMessage()); - } - } - } - } - - private void startAMs(AMDefinition amDef) { - for (int i = 0; i < amDef.getJobCount(); i++) { - JobDefinition jobDef = JobDefinition.Builder.create() - .withAmDefinition(amDef) - .withDeadline(-1) - .withReservationId(null) - .withParams(null) - .build(); - runNewAM(jobDef); - } - } - - private void startAMs(AMDefinition amDef, ReservationId reservationId, - Map params, long deadline) { - for (int i = 0; i < amDef.getJobCount(); i++) { - JobDefinition jobDef = JobDefinition.Builder.create() - .withAmDefinition(amDef) - .withReservationId(reservationId) - .withParams(params) - .withDeadline(deadline) - .build(); - runNewAM(jobDef); - } - } - - /** - * Parse workload from a rumen trace file. - */ - @SuppressWarnings("unchecked") - private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) - throws IOException { - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", "file:///"); - File fin = new File(inputTrace); - - try (JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf)) { - LoggedJob job = reader.getNext(); - - while (job != null) { - try { - AMDefinitionRumen amDef = - AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS, - this); - startAMs(amDef); - } catch (Exception e) { - LOG.error("Failed to create an AM", e); - } - job = reader.getNext(); - } - } - } - Resource getDefaultContainerResource() { int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); @@ -533,92 +228,23 @@ Resource getDefaultContainerResource() { return Resources.createResource(containerMemory, containerVCores); } - /** - * parse workload information from synth-generator trace files. - */ - @SuppressWarnings("unchecked") - private void startAMFromSynthGenerator() throws YarnException, IOException { - Configuration localConf = new Configuration(); - localConf.set("fs.defaultFS", "file:///"); - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - } - - SynthJob job; - // we use stjp, a reference to the job producer instantiated during node - // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { - ReservationId reservationId = null; - if (job.hasDeadline()) { - reservationId = ReservationId - .newInstance(rm.getStartTime(), AM_ID); - } - AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this); - startAMs(amDef, reservationId, job.getParams(), job.getDeadline()); - } - } - - void increaseQueueAppNum(String queue) throws YarnException { - SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); - String queueName = wrapper.getRealQueueName(queue); - Integer appNum = queueAppNumMap.get(queueName); - if (appNum == null) { - appNum = 1; - } else { - appNum = appNum + 1; - } - - queueAppNumMap.put(queueName, appNum); - SchedulerMetrics metrics = wrapper.getSchedulerMetrics(); - if (metrics != null) { - metrics.trackQueue(queueName); - } - } - - private AMSimulator createAmSimulator(String jobType) { - return (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), new Configuration()); - } - - private void runNewAM(JobDefinition jobDef) { - AMDefinition amDef = jobDef.getAmDefinition(); - String oldJobId = amDef.getOldAppId(); - AMSimulator amSim = - createAmSimulator(amDef.getAmType()); - - if (amSim != null) { - int heartbeatInterval = getConf().getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - boolean isTracked = trackedApps.contains(oldJobId); - - if (oldJobId == null) { - oldJobId = Integer.toString(AM_ID); - } - AM_ID++; - amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim); - if (jobDef.getReservationId() != null) { - // if we have a ReservationId, delegate reservation creation to - // AMSim (reservation shape is impl specific) - UTCClock clock = new UTCClock(); - amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), - clock.getTime()); - } - runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime()); - numTasks += amDef.getTaskContainers().size(); - amMap.put(oldJobId, amSim); - } + public void increaseQueueAppNum(String queue) throws YarnException { + rmRunner.increaseQueueAppNum(queue); } private void printSimulationInfo() { + final int numAMs = amRunner.getNumAMs(); + final int numTasks = amRunner.getNumTasks(); + final long maxRuntime = amRunner.getMaxRuntime(); + Map amMap = amRunner.getAmMap(); + Map queueAppNumMap = rmRunner.getQueueAppNumMap(); + if (printSimulation) { // node LOG.info("------------------------------------"); LOG.info("# nodes = {}, # racks = {}, capacity " + "of each node {}.", - numNMs, numRacks, nodeManagerResource); + nmRunner.getNumNMs(), nmRunner.getNumRacks(), nmRunner.getNodeManagerResource()); LOG.info("------------------------------------"); // job LOG.info("# applications = {}, # total " + @@ -642,12 +268,12 @@ private void printSimulationInfo() { LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places - simulateInfoMap.put("Number of racks", numRacks); - simulateInfoMap.put("Number of nodes", numNMs); + simulateInfoMap.put("Number of racks", nmRunner.getNumRacks()); + simulateInfoMap.put("Number of nodes", nmRunner.getNumNMs()); simulateInfoMap.put("Node memory (MB)", - nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.MEMORY_URI)); simulateInfoMap.put("Node VCores", - nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.VCORES_URI)); simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Average tasks per applicaion", @@ -660,11 +286,14 @@ private void printSimulationInfo() { } public Map getNmMap() { - return nmMap; + return nmRunner.getNmMap(); } public static void decreaseRemainingApps() { - remainingApps--; + AMRunner.REMAINING_APPS--; + if (AMRunner.REMAINING_APPS == 0) { + exitSLSRunner(); + } } public static void exitSLSRunner() { @@ -675,7 +304,7 @@ public static void exitSLSRunner() { } public void stop() throws InterruptedException { - rm.stop(); + rmRunner.stop(); runner.stop(); } @@ -729,7 +358,7 @@ public int run(final String[] argv) throws IOException, InterruptedException, throw new YarnException("Cannot create output directory"); } - Set trackedJobSet = new HashSet(); + Set trackedJobSet = new HashSet<>(); if (cmd.hasOption("trackjobs")) { String trackjobs = cmd.getOptionValue("trackjobs"); String jobIds[] = trackjobs.split(","); @@ -850,11 +479,11 @@ public int hashCode() { } } - public ResourceManager getRm() { - return rm; - } - public SynthTraceJobProducer getStjp() { return stjp; } + + public AMSimulator getAMSimulatorByAppId(ApplicationId appId) { + return amRunner.getAMSimulator(appId); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index d28407669cb..e46dea521c5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -44,15 +44,11 @@ private static final Logger LOG = LoggerFactory.getLogger( MockAMLauncher.class); - private Map appIdAMSim; + private SLSRunner slsRunner; - SLSRunner se; - - public MockAMLauncher(SLSRunner se, RMContext rmContext, - Map appIdAMSim) { + public MockAMLauncher(SLSRunner slsRunner, RMContext rmContext) { super(rmContext); - this.appIdAMSim = appIdAMSim; - this.se = se; + this.slsRunner = slsRunner; } @Override @@ -79,12 +75,11 @@ private void setupAMRMToken(RMAppAttempt appAttempt) { } @Override - @SuppressWarnings("unchecked") public void handle(AMLauncherEvent event) { ApplicationId appId = event.getAppAttempt().getAppAttemptId().getApplicationId(); // find AMSimulator - AMSimulator ams = appIdAMSim.get(appId); + AMSimulator ams = slsRunner.getAMSimulatorByAppId(appId); if (ams == null) { throw new YarnRuntimeException( "Didn't find any AMSimulator for applicationId=" + appId); @@ -103,7 +98,7 @@ public void handle(AMLauncherEvent event) { event.getAppAttempt().getMasterContainer()); LOG.info("Notify AM launcher launched:" + amContainer.getId()); - se.getNmMap().get(amContainer.getNodeId()) + slsRunner.getNmMap().get(amContainer.getNodeId()) .addNewContainer(amContainer, -1, appId); ams.getRanNodes().add(amContainer.getNodeId()); return; @@ -111,7 +106,7 @@ public void handle(AMLauncherEvent event) { throw new YarnRuntimeException(e); } case CLEANUP: - se.getNmMap().get(amContainer.getNodeId()) + slsRunner.getNmMap().get(amContainer.getNodeId()) .cleanupContainer(amContainer.getId()); break; default: