diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 9d35d1b..4717bbd 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -247,7 +247,7 @@ private void startNM() throws YarnException, IOException { break; case SYNTH: stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(), + nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(), stjp.getNodesPerRack())); break; default: @@ -259,6 +259,10 @@ private void startNM() throws YarnException, IOException { nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); } + if (nodeSet.size() == 0) { + throw new YarnException("No node! Please configure nodes."); + } + // create NM simulators Random random = new Random(); Set rackSet = new HashSet(); @@ -348,7 +352,6 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException { private void createAMForJob(Map jsonJob) throws YarnException { long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString()); String user = (String) jsonJob.get("job.user"); if (user == null) { @@ -358,25 +361,53 @@ private void createAMForJob(Map jsonJob) throws YarnException { String queue = jsonJob.get("job.queue.name").toString(); increaseQueueAppNum(queue); - String oldAppId = jsonJob.get("job.id").toString(); + String oldAppId = (String)jsonJob.get("job.id"); + if (oldAppId == null) { + oldAppId = Integer.toString(AM_ID); + } - // tasks - List tasks = (List) jsonJob.get("job.tasks"); - if (tasks == null || tasks.size() == 0) { + List containers = new ArrayList<>(); + addTaskContainers(jsonJob, containers); + + if (containers.size() == 0) { throw new YarnException("No task for the job!"); } - List containerList = new ArrayList<>(); + String amType = (String)jsonJob.get("am.type"); + if (amType == null) { + amType = SLSUtils.DEFAULT_JOB_TYPE; + } + + runNewAM(amType, user, queue, oldAppId, jobStartTime, 0, containers, null); + } + + private void addTaskContainers(Map jsonJob, + List containers) throws YarnException { + List tasks = (List) jsonJob.get("job.tasks"); + if (tasks == null) { + return; + } + for (Object o : tasks) { Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - long taskStart = Long.parseLong(jsonTask.get("container.start.ms") - .toString()); - long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") - .toString()); - long lifeTime = taskFinish - taskStart; - - // Set memory and vcores from job trace file + + String hostname = (String) jsonTask.get("container.host"); + + long duration = 0; + if (jsonTask.containsKey("duration.ms")) { + duration = Integer.parseInt(jsonTask.get("duration.ms").toString()); + } else if (jsonTask.containsKey("container.start.ms") && + jsonTask.containsKey("container.end.ms")) { + long taskStart = Long.parseLong(jsonTask.get("container.start.ms") + .toString()); + long taskFinish = Long.parseLong(jsonTask.get("container.end.ms") + .toString()); + duration = taskFinish - taskStart; + } + if (duration == 0) { + throw new YarnException("Duration of a task shouldn't be 0!"); + } + Resource res = getDefaultContainerResource(); if (jsonTask.containsKey("container.memory")) { int containerMemory = @@ -390,17 +421,27 @@ private void createAMForJob(Map jsonJob) throws YarnException { res.setVirtualCores(containerVCores); } - int priority = Integer.parseInt(jsonTask.get("container.priority") - .toString()); - String type = jsonTask.get("container.type").toString(); - containerList.add( - new ContainerSimulator(res, lifeTime, hostname, priority, type)); - } + int priority = 19; + if (jsonTask.containsKey("container.priority")) { + priority = Integer.parseInt(jsonTask.get("container.priority") + .toString()); + } - // create a new AM - String amType = jsonJob.get("am.type").toString(); - runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - containerList, null); + String type = "map"; + if (jsonTask.containsKey("container.type")) { + type = jsonTask.get("container.type").toString(); + } + + int count = 1; + if (jsonTask.containsKey("count")) { + count = Integer.parseInt(jsonTask.get("count").toString()); + } + + for (int i = 0; i < count; i++) { + containers.add( + new ContainerSimulator(res, duration, hostname, priority, type)); + } + } } /** diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 45a3c07..7b39275 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -400,26 +400,28 @@ public void untrackApp() { Map nodeLocalRequestMap = new HashMap(); ResourceRequest anyRequest = null; for (ContainerSimulator cs : csList) { - String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); - // check rack local - String rackname = "/" + rackHostNames[0]; - if (rackLocalRequestMap.containsKey(rackname)) { - rackLocalRequestMap.get(rackname).setNumContainers( - rackLocalRequestMap.get(rackname).getNumContainers() + 1); - } else { - ResourceRequest request = createResourceRequest( - cs.getResource(), rackname, priority, 1); - rackLocalRequestMap.put(rackname, request); - } - // check node local - String hostname = rackHostNames[1]; - if (nodeLocalRequestMap.containsKey(hostname)) { - nodeLocalRequestMap.get(hostname).setNumContainers( - nodeLocalRequestMap.get(hostname).getNumContainers() + 1); - } else { - ResourceRequest request = createResourceRequest( - cs.getResource(), hostname, priority, 1); - nodeLocalRequestMap.put(hostname, request); + if (cs.getHostname() != null) { + String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); + // check rack local + String rackname = "/" + rackHostNames[0]; + if (rackLocalRequestMap.containsKey(rackname)) { + rackLocalRequestMap.get(rackname).setNumContainers( + rackLocalRequestMap.get(rackname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), rackname, priority, 1); + rackLocalRequestMap.put(rackname, request); + } + // check node local + String hostname = rackHostNames[1]; + if (nodeLocalRequestMap.containsKey(hostname)) { + nodeLocalRequestMap.get(hostname).setNumContainers( + nodeLocalRequestMap.get(hostname).getNumContainers() + 1); + } else { + ResourceRequest request = + createResourceRequest(cs.getResource(), hostname, priority, 1); + nodeLocalRequestMap.put(hostname, request); + } } // any if (anyRequest == null) { diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index e27b36f..9c3846b 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -101,7 +101,7 @@ */ public static Set parseNodesFromSLSTrace(String jobTrace) throws IOException { - Set nodeSet = new HashSet(); + Set nodeSet = new HashSet<>(); JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); Reader input = @@ -110,11 +110,26 @@ Iterator i = mapper.readValues(jsonF.createParser(input), Map.class); while (i.hasNext()) { Map jsonE = i.next(); - List tasks = (List) jsonE.get("job.tasks"); - for (Object o : tasks) { - Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - nodeSet.add(hostname); + if (jsonE.containsKey("num_nodes")) { + int numNodes = Integer.parseInt(jsonE.get("num_nodes").toString()); + int nodesPerRack = numNodes; + if (jsonE.containsKey("nodes_per_rack")) { + nodesPerRack = Integer.parseInt( + jsonE.get("nodes_per_rack").toString()); + } + nodeSet.addAll(generateNodes(numNodes, nodesPerRack)); + } else { + List tasks = (List) jsonE.get("job.tasks"); + if (tasks == null) { + continue; + } + for (Object o : tasks) { + Map jsonTask = (Map) o; + String hostname = (String )jsonTask.get("container.host"); + if (hostname != null) { + nodeSet.add(hostname); + } + } } } } finally { @@ -150,9 +165,9 @@ return nodeSet; } - public static Set generateNodesFromSynth( + public static Set generateNodes( int numNodes, int nodesPerRack) { - Set nodeSet = new HashSet(); + Set nodeSet = new HashSet<>(); for (int i = 0; i < numNodes; i++) { nodeSet.add("/rack" + i % nodesPerRack + "/node" + i); }