commit 3e509e9604424da9a3a2456f1cf21edb8a83fbf9 Author: Wangda Tan Date: Wed Mar 30 14:48:01 2016 -0700 YARN-4900 diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5e36750..3b3c6cd 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -345,7 +345,7 @@ public void untrackApp() { for (ContainerSimulator cs : csList) { String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname()); // check rack local - String rackname = rackHostNames[0]; + String rackname = "/" + rackHostNames[0]; if (rackLocalRequestMap.containsKey(rackname)) { rackLocalRequestMap.get(rackname).setNumContainers( rackLocalRequestMap.get(rackname).getNumContainers() + 1); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index e20e9e4..0d75eae 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -64,7 +64,7 @@ scheduled when all maps have finished (not support slow-start currently). private static final int PRIORITY_REDUCE = 10; private static final int PRIORITY_MAP = 20; - + // pending maps private LinkedList pendingMaps = new LinkedList<>(); @@ -263,6 +263,13 @@ private void restart() amContainer = null; } + private List mergeLists(List left, List right) { + List list = new ArrayList<>(); + list.addAll(left); + list.addAll(right); + return list; + } + @Override protected void sendContainerRequest() throws YarnException, IOException, InterruptedException { @@ -275,14 +282,16 @@ protected void sendContainerRequest() if (mapFinished != mapTotal) { // map phase if (!pendingMaps.isEmpty()) { - ask = packageRequests(pendingMaps, PRIORITY_MAP); + ask = packageRequests(mergeLists(pendingMaps, scheduledMaps), + PRIORITY_MAP); LOG.debug(MessageFormat .format("Application {0} sends out " + "request for {1} mappers.", appId, pendingMaps.size())); scheduledMaps.addAll(pendingMaps); pendingMaps.clear(); - } else if (!pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) { - ask = packageRequests(pendingFailedMaps, PRIORITY_MAP); + } else if (!pendingFailedMaps.isEmpty()) { + ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps), + PRIORITY_MAP); LOG.debug(MessageFormat.format( "Application {0} sends out " + "requests for {1} failed mappers.", appId, pendingFailedMaps.size())); @@ -292,15 +301,16 @@ protected void sendContainerRequest() } else if (reduceFinished != reduceTotal) { // reduce phase if (!pendingReduces.isEmpty()) { - ask = packageRequests(pendingReduces, PRIORITY_REDUCE); + ask = packageRequests(mergeLists(pendingReduces, scheduledReduces), + PRIORITY_REDUCE); LOG.debug(MessageFormat .format("Application {0} sends out " + "requests for {1} reducers.", appId, pendingReduces.size())); scheduledReduces.addAll(pendingReduces); pendingReduces.clear(); - } else if (!pendingFailedReduces.isEmpty() && scheduledReduces - .isEmpty()) { - ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE); + } else if (!pendingFailedReduces.isEmpty()) { + ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces), + PRIORITY_REDUCE); LOG.debug(MessageFormat.format( "Application {0} sends out " + "request for {1} failed reducers.", appId, pendingFailedReduces.size()));