diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index c792335..732c7f5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -52,12 +53,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; +import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; @@ -117,10 +120,10 @@ public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, this.printSimulation = printsimulation; metricsOutputDir = outputDir; - nmMap = new HashMap(); - queueAppNumMap = new HashMap(); - amMap = new HashMap(); - amClassMap = new HashMap(); + nmMap = new HashMap<>(); + queueAppNumMap = new HashMap<>(); + amMap = new ConcurrentHashMap<>(); + amClassMap = new HashMap<>(); // runner configuration conf = new Configuration(false); @@ -177,7 +180,14 @@ private void startRM() throws IOException, ClassNotFoundException { } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); - rm = new ResourceManager(); + + final SLSRunner se = this; + rm = new ResourceManager() { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new MockAMLauncher(se, this.rmContext, amMap); + } + }; rm.init(rmConf); rm.start(); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index d61bf02..5e36750 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; @@ -107,11 +109,19 @@ // progress protected int totalContainers; protected int finishedContainers; + + // waiting for AM container + volatile boolean isAMContainerRunning = false; + volatile Container amContainer; protected final Logger LOG = Logger.getLogger(AMSimulator.class); - + + // resource for AM container + private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; + private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + public AMSimulator() { - this.responseQueue = new LinkedBlockingQueue(); + this.responseQueue = new LinkedBlockingQueue<>(); } public void init(int id, int heartbeatInterval, @@ -142,23 +152,30 @@ public void firstStep() throws Exception { // submit application, waiting until ACCEPTED submitApp(); - // register application master - registerAM(); - // track app metrics trackApp(); } + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + this.amContainer = masterContainer; + this.appAttemptId = masterContainer.getId().getApplicationAttemptId(); + registerAM(); + isAMContainerRunning = true; + } + @Override public void middleStep() throws Exception { - // process responses in the queue - processResponseQueue(); - - // send out request - sendContainerRequest(); - - // check whether finish - checkStop(); + if (isAMContainerRunning) { + // process responses in the queue + processResponseQueue(); + + // send out request + sendContainerRequest(); + + // check whether finish + checkStop(); + } } @Override @@ -168,6 +185,13 @@ public void lastStep() throws Exception { if (isTracked) { untrackApp(); } + + if (null == appAttemptId) { + // If appAttemptId == null, AM is not launched from RM's perspective, so + // it's unnecessary to finish am as well + return; + } + // unregister application master final FinishApplicationMasterRequest finishAMRequest = recordFactory .newRecordInstance(FinishApplicationMasterRequest.class); @@ -256,7 +280,9 @@ private void submitApp() conLauContext.setLocalResources(new HashMap()); conLauContext.setServiceData(new HashMap()); appSubContext.setAMContainerSpec(conLauContext); - appSubContext.setUnmanagedAM(true); + appSubContext.setResource(Resources + .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, + MR_AM_CONTAINER_RESOURCE_VCORES)); subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction() { @@ -267,22 +293,6 @@ public Object run() throws YarnException, IOException { } }); LOG.info(MessageFormat.format("Submit a new application {0}", appId)); - - // waiting until application ACCEPTED - RMApp app = rm.getRMContext().getRMApps().get(appId); - while(app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(10); - } - - // Waiting until application attempt reach LAUNCHED - // "Unmanaged AM must register after AM attempt reaches LAUNCHED state" - this.appAttemptId = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt().getAppAttemptId(); - RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt(); - while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) { - Thread.sleep(10); - } } private void registerAM() @@ -383,4 +393,12 @@ public long getDuration() { public int getNumTasks() { return totalContainers; } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index da267a1..e20e9e4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import org.apache.avro.Protocol; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; @@ -66,7 +67,7 @@ scheduled when all maps have finished (not support slow-start currently). // pending maps private LinkedList pendingMaps = - new LinkedList(); + new LinkedList<>(); // pending failed maps private LinkedList pendingFailedMaps = @@ -107,14 +108,9 @@ scheduled when all maps have finished (not support slow-start currently). private int mapTotal = 0; private int reduceFinished = 0; private int reduceTotal = 0; - // waiting for AM container - private boolean isAMContainerRunning = false; - private Container amContainer; + // finished private boolean isFinished = false; - // resource for AM container - private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; - private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; public final Logger LOG = Logger.getLogger(MRAMSimulator.class); @@ -131,83 +127,34 @@ public void init(int id, int heartbeatInterval, for (ContainerSimulator cs : containerList) { if (cs.getType().equals("map")) { cs.setPriority(PRIORITY_MAP); - pendingMaps.add(cs); + allMaps.add(cs); } else if (cs.getType().equals("reduce")) { cs.setPriority(PRIORITY_REDUCE); - pendingReduces.add(cs); + allReduces.add(cs); } } - allMaps.addAll(pendingMaps); - allReduces.addAll(pendingReduces); - mapTotal = pendingMaps.size(); - reduceTotal = pendingReduces.size(); + + LOG.info(MessageFormat + .format("Added new job with {0} mapper and {1} reducers", + allMaps.size(), allReduces.size())); + + mapTotal = allMaps.size(); + reduceTotal = allReduces.size(); totalContainers = mapTotal + reduceTotal; } @Override - public void firstStep() throws Exception { - super.firstStep(); - - requestAMContainer(); - } + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + super.notifyAMContainerLaunched(masterContainer); - /** - * send out request for AM container - */ - protected void requestAMContainer() - throws YarnException, IOException, InterruptedException { - List ask = new ArrayList(); - ResourceRequest amRequest = createResourceRequest( - BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, - MR_AM_CONTAINER_RESOURCE_VCORES), - ResourceRequest.ANY, 1, 1); - ask.add(amRequest); - LOG.debug(MessageFormat.format("Application {0} sends out allocate " + - "request for its AM", appId)); - final AllocateRequest request = this.createAllocateRequest(ask); - - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token token = rm.getRMContext().getRMApps() - .get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( - new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); - } - }); - if (response != null) { - responseQueue.put(response); - } + restart(); } @Override @SuppressWarnings("unchecked") protected void processResponseQueue() throws InterruptedException, YarnException, IOException { - // Check whether receive the am container - if (!isAMContainerRunning) { - if (!responseQueue.isEmpty()) { - AllocateResponse response = responseQueue.take(); - if (response != null - && !response.getAllocatedContainers().isEmpty()) { - // Get AM container - Container container = response.getAllocatedContainers().get(0); - se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, -1L); - // Start AM container - amContainer = container; - LOG.debug(MessageFormat.format("Application {0} starts its " + - "AM container ({1}).", appId, amContainer.getId())); - isAMContainerRunning = true; - } - } - return; - } - while (! responseQueue.isEmpty()) { AllocateResponse response = responseQueue.take(); @@ -228,7 +175,7 @@ protected void processResponseQueue() assignedReduces.remove(containerId); reduceFinished ++; finishedContainers ++; - } else { + } else if (amContainer.getId().equals(containerId)){ // am container released event isFinished = true; LOG.info(MessageFormat.format("Application {0} goes to " + @@ -244,10 +191,9 @@ protected void processResponseQueue() LOG.debug(MessageFormat.format("Application {0} has one " + "reducer killed ({1}).", appId, containerId)); pendingFailedReduces.add(assignedReduces.remove(containerId)); - } else { + } else if (amContainer.getId().equals(containerId)){ LOG.info(MessageFormat.format("Application {0}'s AM is " + - "going to be killed. Restarting...", appId)); - restart(); + "going to be killed. Waiting for rescheduling...", appId)); } } } @@ -255,11 +201,8 @@ protected void processResponseQueue() // check finished if (isAMContainerRunning && - (mapFinished == mapTotal) && - (reduceFinished == reduceTotal)) { - // to release the AM container - se.getNmMap().get(amContainer.getNodeId()) - .cleanupContainer(amContainer.getId()); + (mapFinished >= mapTotal) && + (reduceFinished >= reduceTotal)) { isAMContainerRunning = false; LOG.debug(MessageFormat.format("Application {0} sends out event " + "to clean up its AM container.", appId)); @@ -293,21 +236,31 @@ protected void processResponseQueue() */ private void restart() throws YarnException, IOException, InterruptedException { - // clear - finishedContainers = 0; + // clear isFinished = false; - mapFinished = 0; - reduceFinished = 0; pendingFailedMaps.clear(); pendingMaps.clear(); pendingReduces.clear(); pendingFailedReduces.clear(); - pendingMaps.addAll(allMaps); - pendingReduces.addAll(pendingReduces); - isAMContainerRunning = false; + + // Only add totalMaps - finishedMaps + int added = 0; + for (ContainerSimulator cs : allMaps) { + if (added >= mapTotal - mapFinished) { + break; + } + pendingMaps.add(cs); + } + + // And same, only add totalReduces - finishedReduces + added = 0; + for (ContainerSimulator cs : allReduces) { + if (added >= reduceTotal - reduceFinished) { + break; + } + pendingReduces.add(cs); + } amContainer = null; - // resent am container request - requestAMContainer(); } @Override @@ -319,44 +272,45 @@ protected void sendContainerRequest() // send out request List ask = null; - if (isAMContainerRunning) { - if (mapFinished != mapTotal) { - // map phase - if (! pendingMaps.isEmpty()) { - ask = packageRequests(pendingMaps, PRIORITY_MAP); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "request for {1} mappers.", appId, pendingMaps.size())); - scheduledMaps.addAll(pendingMaps); - pendingMaps.clear(); - } else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) { - ask = packageRequests(pendingFailedMaps, PRIORITY_MAP); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "requests for {1} failed mappers.", appId, - pendingFailedMaps.size())); - scheduledMaps.addAll(pendingFailedMaps); - pendingFailedMaps.clear(); - } - } else if (reduceFinished != reduceTotal) { - // reduce phase - if (! pendingReduces.isEmpty()) { - ask = packageRequests(pendingReduces, PRIORITY_REDUCE); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "requests for {1} reducers.", appId, pendingReduces.size())); - scheduledReduces.addAll(pendingReduces); - pendingReduces.clear(); - } else if (! pendingFailedReduces.isEmpty() - && scheduledReduces.isEmpty()) { - ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE); - LOG.debug(MessageFormat.format("Application {0} sends out " + - "request for {1} failed reducers.", appId, - pendingFailedReduces.size())); - scheduledReduces.addAll(pendingFailedReduces); - pendingFailedReduces.clear(); - } + if (mapFinished != mapTotal) { + // map phase + if (!pendingMaps.isEmpty()) { + ask = packageRequests(pendingMaps, PRIORITY_MAP); + LOG.debug(MessageFormat + .format("Application {0} sends out " + "request for {1} mappers.", + appId, pendingMaps.size())); + scheduledMaps.addAll(pendingMaps); + pendingMaps.clear(); + } else if (!pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) { + ask = packageRequests(pendingFailedMaps, PRIORITY_MAP); + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "requests for {1} failed mappers.", + appId, pendingFailedMaps.size())); + scheduledMaps.addAll(pendingFailedMaps); + pendingFailedMaps.clear(); + } + } else if (reduceFinished != reduceTotal) { + // reduce phase + if (!pendingReduces.isEmpty()) { + ask = packageRequests(pendingReduces, PRIORITY_REDUCE); + LOG.debug(MessageFormat + .format("Application {0} sends out " + "requests for {1} reducers.", + appId, pendingReduces.size())); + scheduledReduces.addAll(pendingReduces); + pendingReduces.clear(); + } else if (!pendingFailedReduces.isEmpty() && scheduledReduces + .isEmpty()) { + ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE); + LOG.debug(MessageFormat.format( + "Application {0} sends out " + "request for {1} failed reducers.", + appId, pendingFailedReduces.size())); + scheduledReduces.addAll(pendingFailedReduces); + pendingFailedReduces.clear(); } } + if (ask == null) { - ask = new ArrayList(); + ask = new ArrayList<>(); } final AllocateRequest request = createAllocateRequest(ask); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java new file mode 100644 index 0000000..20cf3e5 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; + +import java.util.Map; + +public class MockAMLauncher extends ApplicationMasterLauncher + implements EventHandler { + private static final Log LOG = LogFactory.getLog( + MockAMLauncher.class); + + Map amMap; + SLSRunner se; + + public MockAMLauncher(SLSRunner se, RMContext rmContext, + Map amMap) { + super(rmContext); + this.amMap = amMap; + this.se = se; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // Do nothing + } + + @Override + protected void serviceStart() throws Exception { + // Do nothing + } + + private void setupAMRMToken(RMAppAttempt appAttempt) { + // Setup AMRMToken + Token amrmToken = + super.context.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.getAppAttemptId()); + ((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken); + } + + @Override + @SuppressWarnings("unchecked") + public void handle(AMLauncherEvent event) { + if (AMLauncherEventType.LAUNCH == event.getType()) { + ApplicationId appId = + event.getAppAttempt().getAppAttemptId().getApplicationId(); + + // find AMSimulator + for (AMSimulator ams : amMap.values()) { + if (ams.getApplicationId() != null && ams.getApplicationId().equals( + appId)) { + try { + Container amContainer = event.getAppAttempt().getMasterContainer(); + + setupAMRMToken(event.getAppAttempt()); + + // Notify RMAppAttempt to change state + super.context.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); + + ams.notifyAMContainerLaunched( + event.getAppAttempt().getMasterContainer()); + LOG.info("Notify AM launcher launched:" + amContainer.getId()); + + se.getNmMap().get(amContainer.getNodeId()) + .addNewContainer(amContainer, 100000000L); + + return; + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + } + + throw new YarnRuntimeException( + "Didn't find any AMSimulator for applicationId=" + appId); + } + } +}