diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index ad4310f3648..ea1f3a19878 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -444,7 +444,7 @@ private void createAMForJob(Map jsonJob) throws YarnException { for (int i = 0; i < jobCount; i++) { runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob)); + getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); } } @@ -607,7 +607,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs) // Only supports the default job type currently runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, null, + jobStartTimeMS, jobFinishTimeMS, containerList, getAMContainerResource(null)); } @@ -632,107 +632,66 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { UTCClock clock = new UTCClock(); long now = clock.getTime(); - try { - - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - } - - SynthJob job = null; - // we use stjp, a reference to the job producer instantiated during node - // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { - // only support MapReduce currently - String user = job.getUser(); - String jobQueue = job.getQueueName(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmissionTime(); - - // CARLO: Finish time is only used for logging, omit for now - long jobFinishTimeMS = -1L; - - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - increaseQueueAppNum(jobQueue); - - List containerList = - new ArrayList(); - ArrayList keyAsArray = new ArrayList(nmMap.keySet()); - Random rand = new Random(stjp.getSeed()); - - Resource maxMapRes = Resource.newInstance(0, 0); - long maxMapDur = 0; - // map tasks - for (int i = 0; i < job.getNumberMaps(); i++) { - TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); - RMNode node = nmMap - .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); - String hostname = "/" + node.getRackName() + "/" + node.getHostName(); - long containerLifeTime = tai.getRuntime(); - Resource containerResource = - Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), - (int) tai.getTaskInfo().getTaskVCores()); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); - maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); - maxMapDur = - containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; - - } + // if we use the nodeFile this could have been not initialized yet. + if (stjp == null) { + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } - Resource maxRedRes = Resource.newInstance(0, 0); - long maxRedDur = 0; - // reduce tasks - for (int i = 0; i < job.getNumberReduces(); i++) { - TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); - RMNode node = nmMap - .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); - String hostname = "/" + node.getRackName() + "/" + node.getHostName(); - long containerLifeTime = tai.getRuntime(); - Resource containerResource = - Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), - (int) tai.getTaskInfo().getTaskVCores()); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); - maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); - maxRedDur = - containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; + SynthJob job = null; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + // only support MapReduce currently + String user = job.getUser(); + String jobQueue = job.getQueueName(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmissionTime(); - } + // CARLO: Finish time is only used for logging, omit for now + long jobFinishTimeMS = jobStartTimeMS + job.getDuration(); - // generating reservations for the jobs that require them + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job {} start time to 0.", oldJobId); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } - ReservationSubmissionRequest rr = null; - if (job.hasDeadline()) { - ReservationId reservationId = - ReservationId.newInstance(this.rm.getStartTime(), AM_ID); + increaseQueueAppNum(jobQueue); + + List containerList = + new ArrayList(); + ArrayList keyAsArray = new ArrayList(nmMap.keySet()); + Random rand = new Random(stjp.getSeed()); + + for(SynthJob.SynthTask task : job.getTasks()){ + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = task.getTime(); + Resource containerResource = + Resource.newInstance((int) task.getMemory(), (int) task.getVcores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, task.getPriority(), task.getType())); + } - rr = ReservationClientUtil.createMRReservation(reservationId, - "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur, - maxRedRes, job.getNumberReduces(), maxRedDur, - now + jobStartTimeMS, now + job.getDeadline(), - job.getQueueName()); - } + ReservationId reservationId = null; - runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, rr, - getAMContainerResource(null)); + if(job.hasDeadline()){ + reservationId = ReservationId + .newInstance(this.rm.getStartTime(), AM_ID); } - } finally { - stjp.close(); - } + runNewAM(job.getType(), user, jobQueue, oldJobId, + jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, + job.getDeadline(), getAMContainerResource(null), + job.getParams()); + } } private Resource getAMContainerResource(Map jsonJob) { @@ -772,7 +731,17 @@ private void increaseQueueAppNum(String queue) throws YarnException { private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List containerList, - ReservationSubmissionRequest rr, Resource amContainerResource) { + Resource amContainerResource) { + runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, + jobFinishTimeMS, containerList, null, -1, + amContainerResource, null); + } + + private void runNewAM(String jobType, String user, + String jobQueue, String oldJobId, long jobStartTimeMS, + long jobFinishTimeMS, List containerList, + ReservationId reservationId, long deadline, Resource amContainerResource, + Map params) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -787,10 +756,15 @@ private void runNewAM(String jobType, String user, oldJobId = Integer.toString(AM_ID); } AM_ID++; - amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, - jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr, - runner.getStartTimeMS(), amContainerResource); + jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, + runner.getStartTimeMS(), amContainerResource, params); + if(reservationId != null) { + // if we have a ReservationId, delegate reservation creation to + // AMSim (reservation shape is impl specific) + UTCClock clock = new UTCClock(); + amSim.initReservation(reservationId, deadline, clock.getTime()); + } runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 72698ea10a5..8ee4a1fb3b1 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -85,7 +85,7 @@ protected final BlockingQueue responseQueue; private int responseId = 0; // user name - protected String user; + protected String user; // queue name protected String queue; // am type @@ -105,12 +105,12 @@ // waiting for AM container volatile boolean isAMContainerRunning = false; volatile Container amContainer; - + private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class); private Resource amContainerResource; - private ReservationSubmissionRequest reservationRequest; + protected ReservationSubmissionRequest reservationRequest; public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); @@ -120,9 +120,8 @@ public AMSimulator() { public void init(int heartbeatInterval, List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, - ReservationSubmissionRequest rr, long baseTimeMS, - Resource amContainerResource) { + String simQueue, boolean tracked, String oldApp, long baseTimeMS, + Resource amContainerResource, Map params) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -134,7 +133,6 @@ public void init(int heartbeatInterval, this.baselineTimeMS = baseTimeMS; this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; - this.reservationRequest = rr; this.amContainerResource = amContainerResource; } @@ -256,7 +254,7 @@ public Object run() throws Exception { simulateStartTimeMS, simulateFinishTimeMS); } } - + protected ResourceRequest createResourceRequest( Resource resource, String host, int priority, int numContainers) { ResourceRequest request = recordFactory @@ -269,7 +267,7 @@ protected ResourceRequest createResourceRequest( request.setPriority(prio); return request; } - + protected AllocateRequest createAllocateRequest(List ask, List toRelease) { AllocateRequest allocateRequest = @@ -279,36 +277,39 @@ protected AllocateRequest createAllocateRequest(List ask, allocateRequest.setReleaseList(toRelease); return allocateRequest; } - + protected AllocateRequest createAllocateRequest(List ask) { return createAllocateRequest(ask, new ArrayList()); } protected abstract void processResponseQueue() throws Exception; - + protected abstract void sendContainerRequest() throws Exception; - + + public abstract void initReservation( + ReservationId reservationId, long deadline, long now); + protected abstract void checkStop(); - + private void submitApp(ReservationId reservationId) throws YarnException, InterruptedException, IOException { // ask for new application GetNewApplicationRequest newAppRequest = Records.newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse newAppResponse = + GetNewApplicationResponse newAppResponse = rm.getClientRMService().getNewApplication(newAppRequest); appId = newAppResponse.getApplicationId(); - + // submit the application final SubmitApplicationRequest subAppRequest = Records.newRecord(SubmitApplicationRequest.class); - ApplicationSubmissionContext appSubContext = + ApplicationSubmissionContext appSubContext = Records.newRecord(ApplicationSubmissionContext.class); appSubContext.setApplicationId(appId); appSubContext.setMaxAppAttempts(1); appSubContext.setQueue(queue); appSubContext.setPriority(Priority.newInstance(0)); - ContainerLaunchContext conLauContext = + ContainerLaunchContext conLauContext = Records.newRecord(ContainerLaunchContext.class); conLauContext.setApplicationACLs(new HashMap<>()); conLauContext.setCommands(new ArrayList<>()); @@ -379,7 +380,7 @@ public void untrackApp() { } } } - + protected List packageRequests( List csList, int priority) { // create requests diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 21bf05402b0..68e8a706d36 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.ReservationClientUtil; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; import org.slf4j.Logger; @@ -51,51 +55,51 @@ @Unstable public class MRAMSimulator extends AMSimulator { /* - Vocabulary Used: + Vocabulary Used: pending -> requests which are NOT yet sent to RM scheduled -> requests which are sent to RM but not yet assigned assigned -> requests which are assigned to a container completed -> request corresponding to which container has completed - + Maps are scheduled as soon as their requests are received. Reduces are scheduled when all maps have finished (not support slow-start currently). */ - + private static final int PRIORITY_REDUCE = 10; private static final int PRIORITY_MAP = 20; // pending maps private LinkedList pendingMaps = new LinkedList<>(); - + // pending failed maps private LinkedList pendingFailedMaps = new LinkedList(); - + // scheduled maps private LinkedList scheduledMaps = new LinkedList(); - + // assigned maps private Map assignedMaps = new HashMap(); - + // reduces which are not yet scheduled private LinkedList pendingReduces = new LinkedList(); - + // pending failed reduces private LinkedList pendingFailedReduces = new LinkedList(); - + // scheduled reduces private LinkedList scheduledReduces = new LinkedList(); - + // assigned reduces private Map assignedReduces = new HashMap(); - + // all maps & reduces private LinkedList allMaps = new LinkedList(); @@ -117,14 +121,14 @@ scheduled when all maps have finished (not support slow-start currently). @SuppressWarnings("checkstyle:parameternumber") public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, - long baselineStartTimeMS, Resource amContainerResource) { + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, long baselineStartTimeMS, + Resource amContainerResource, Map params) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - rr, baselineStartTimeMS, amContainerResource); + baselineStartTimeMS, amContainerResource, params); amtype = "mapreduce"; - + // get map/reduce tasks for (ContainerSimulator cs : containerList) { if (cs.getType().equals("map")) { @@ -202,7 +206,7 @@ protected void processResponseQueue() throws Exception { } } } - + // check finished if (isAMContainerRunning && (mapFinished >= mapTotal) && @@ -234,7 +238,7 @@ protected void processResponseQueue() throws Exception { } } } - + /** * restart running because of the am container killed */ @@ -322,7 +326,7 @@ protected void sendContainerRequest() if (ask == null) { ask = new ArrayList<>(); } - + final AllocateRequest request = createAllocateRequest(ask); if (totalContainers == 0) { request.setProgress(1.0f); @@ -348,6 +352,36 @@ public AllocateResponse run() throws Exception { } } + @Override + public void initReservation( + ReservationId reservationId, long deadline, long now){ + + Comparator maxResource = + new Comparator() { + @Override + public int compare(ContainerSimulator o1, ContainerSimulator o2) { + return o1.getResource().compareTo(o2.getResource()); + } + }; + Comparator maxDuration = + new Comparator() { + @Override + public int compare(ContainerSimulator o1, ContainerSimulator o2) { + return (int)(o1.getLifeTime() - o2.getLifeTime()); + } + }; + + Resource mapRes = Collections.max(allMaps, maxResource).getResource(); + long mapDur = Collections.max(allMaps, maxDuration).getLifeTime(); + Resource redRes = Collections.max(allReduces, maxResource).getResource(); + long redDur = Collections.max(allReduces, maxDuration).getLifeTime(); + + this.reservationRequest = ReservationClientUtil.createMRReservation( + reservationId, "reservation_" + reservationId.getId(), + mapRes, allMaps.size(), mapDur, redRes, allReduces.size(), redDur, + now + traceStartTimeMS, now + deadline, queue); + } + @Override protected void checkStop() { if (isFinished) { diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java new file mode 100644 index 00000000000..7db33820aba --- /dev/null +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.appmaster; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * AMSimulator that simulates streaming services - it keeps tasks + * running and resubmits them whenever they fail or complete. It finishes + * when the specified duration expires. + */ + +@Private +@Unstable +public class StreamAMSimulator extends AMSimulator { + /* + Vocabulary Used: + pending -> requests which are NOT yet sent to RM + scheduled -> requests which are sent to RM but not yet assigned + assigned -> requests which are assigned to a container + completed -> request corresponding to which container has completed + + streams are constantly scheduled. If a streaming job is killed, we restart it + */ + + private static final int PRIORITY_MAP = 20; + + // pending streams + private LinkedList pendingStreams = + new LinkedList<>(); + + // scheduled streams + private LinkedList scheduledStreams = + new LinkedList(); + + // assigned streams + private Map assignedStreams = + new HashMap(); + + // all streams + private LinkedList allStreams = + new LinkedList(); + + // finished + private boolean isFinished = false; + private long duration = 0; + + private static final Logger LOG = + LoggerFactory.getLogger(StreamAMSimulator.class); + + @SuppressWarnings("checkstyle:parameternumber") + public void init(int heartbeatInterval, + List containerList, ResourceManager rm, SLSRunner se, + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, long baselineStartTimeMS, + Resource amContainerResource, Map params) { + super.init(heartbeatInterval, containerList, rm, se, traceStartTime, + traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, + amContainerResource, params); + amtype = "stream"; + + allStreams.addAll(containerList); + + duration = traceFinishTime - traceStartTime; + + LOG.info("Added new job with {} streams, running for {}", + allStreams.size(), duration); + } + + @Override + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + if (null != masterContainer) { + restart(); + super.notifyAMContainerLaunched(masterContainer); + } + } + + @Override + @SuppressWarnings("unchecked") + protected void processResponseQueue() throws Exception { + while (!responseQueue.isEmpty()) { + AllocateResponse response = responseQueue.take(); + + // check completed containers + if (!response.getCompletedContainersStatuses().isEmpty()) { + for (ContainerStatus cs : response.getCompletedContainersStatuses()) { + ContainerId containerId = cs.getContainerId(); + if(assignedStreams.containsKey(containerId)){ + // One of our containers completed. Regardless of reason, + // we want to maintain our streaming process + LOG.debug("Application {} has one streamer finished ({}).", appId, + containerId); + pendingStreams.add( assignedStreams.remove(containerId)); + } else if (amContainer.getId().equals(containerId)){ + // Our am container completed + if(cs.getExitStatus() == ContainerExitStatus.SUCCESS){ + // am container released event (am container completed on success) + isAMContainerRunning = false; + isFinished = true; + LOG.info("Application {} goes to finish.", appId); + } else { + // am container killed - wait for re allocation + LOG.info("Application {}'s AM is " + + "going to be killed. Waiting for rescheduling...", appId); + isAMContainerRunning = false; + } + } + } + } + + // check finished + if (isAMContainerRunning && + (System.currentTimeMillis() - simulateStartTimeMS >= duration)) { + LOG.debug("Application {} sends out event to clean up" + + " its AM container.", appId); + isAMContainerRunning = false; + isFinished = true; + break; + } + + // check allocated containers + for (Container container : response.getAllocatedContainers()) { + if (!scheduledStreams.isEmpty()) { + ContainerSimulator cs = scheduledStreams.remove(); + LOG.debug("Application {} starts to launch a stream ({}).", appId, + container.getId()); + assignedStreams.put(container.getId(), cs); + se.getNmMap().get(container.getNodeId()).addNewContainer(container, cs.getLifeTime()); + } + } + } + } + + /** + * restart running because of the am container killed + */ + private void restart() + throws YarnException, IOException, InterruptedException { + // clear + isFinished = false; + pendingStreams.clear(); + pendingStreams.addAll(allStreams); + + amContainer = null; + } + + private List mergeLists(List left, List right) { + List list = new ArrayList<>(); + list.addAll(left); + list.addAll(right); + return list; + } + + @Override + protected void sendContainerRequest() + throws YarnException, IOException, InterruptedException { + + // send out request + List ask = new ArrayList<>(); + List release = new ArrayList<>(); + if (!isFinished) { + if (!pendingStreams.isEmpty()) { + ask = packageRequests(mergeLists(pendingStreams, scheduledStreams), + PRIORITY_MAP); + LOG.debug("Application {} sends out request for {} streams.", + appId, pendingStreams.size()); + scheduledStreams.addAll(pendingStreams); + pendingStreams.clear(); + } + } + + if(isFinished){ + release.addAll(assignedStreams.keySet()); + ask.clear(); + } + + final AllocateRequest request = createAllocateRequest(ask, release); + if (totalContainers == 0) { + request.setProgress(1.0f); + } else { + request.setProgress((float) finishedContainers / totalContainers); + } + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Token token = rm.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + AllocateResponse response = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return rm.getApplicationMasterService().allocate(request); + } + }); + if (response != null) { + responseQueue.put(response); + } + } + + @Override + public void initReservation( + ReservationId reservationId, long deadline, long now){ + // Streaming AM currently doesn't do reservations + this.reservationRequest = null; + } + + @Override + protected void checkStop() { + if (isFinished) { + super.setEndTime(System.currentTimeMillis()); + } + } + + @Override + public void lastStep() throws Exception { + super.lastStep(); + + // clear data structures + allStreams.clear(); + assignedStreams.clear(); + pendingStreams.clear(); + scheduledStreams.clear(); + responseQueue.clear(); + } +} diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java index 3ed81e1f2cd..cd632eabe21 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -19,19 +19,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.math3.distribution.LogNormalDistribution; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TaskStatus.State; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.tools.rumen.*; -import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -41,11 +35,14 @@ /** * Generates random task data for a synthetic job. */ -public class SynthJob implements JobStory { +public class SynthJob { @SuppressWarnings("StaticVariableName") private static Log LOG = LogFactory.getLog(SynthJob.class); + private static final long MIN_MEMORY = 1024; + private static final long MIN_VCORES = 1; + private final Configuration conf; private final int id; @@ -53,75 +50,41 @@ private static final AtomicInteger sequence = new AtomicInteger(0); private final String name; private final String queueName; - private final SynthJobClass jobClass; + private final SynthTraceJobProducer.JobDefinition jobDef; + + private String type; // job timing private final long submitTime; private final long duration; private final long deadline; - private final int numMapTasks; - private final int numRedTasks; - private final long mapMaxMemory; - private final long reduceMaxMemory; - private final long mapMaxVcores; - private final long reduceMaxVcores; - private final long[] mapRuntime; - private final float[] reduceRuntime; - private long totMapRuntime; - private long totRedRuntime; + private Map params; + + private long totalSlotTime = 0; + + // task information + private List tasks = new ArrayList<>(); + - public SynthJob(JDKRandomGenerator rand, Configuration conf, - SynthJobClass jobClass, long actualSubmissionTime) { + protected SynthJob(JDKRandomGenerator rand, Configuration conf, + SynthTraceJobProducer.JobDefinition jobDef, + String queue, long actualSubmissionTime) { this.conf = conf; - this.jobClass = jobClass; - - this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); - this.numMapTasks = jobClass.getMtasks(); - this.numRedTasks = jobClass.getRtasks(); - - // sample memory distributions, correct for sub-minAlloc sizes - long tempMapMaxMemory = jobClass.getMapMaxMemory(); - this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB - ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory; - long tempReduceMaxMemory = jobClass.getReduceMaxMemory(); - this.reduceMaxMemory = - tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB - ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory; - - // sample vcores distributions, correct for sub-minAlloc sizes - long tempMapMaxVCores = jobClass.getMapMaxVcores(); - this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES - ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores; - long tempReduceMaxVcores = jobClass.getReduceMaxVcores(); - this.reduceMaxVcores = - tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES - ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores; - - if (numMapTasks > 0) { - conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory); - conf.set(MRJobConfig.MAP_JAVA_OPTS, - "-Xmx" + (this.mapMaxMemory - 100) + "m"); - } + this.jobDef = jobDef; - if (numRedTasks > 0) { - conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory); - conf.set(MRJobConfig.REDUCE_JAVA_OPTS, - "-Xmx" + (this.reduceMaxMemory - 100) + "m"); - } + this.queueName = queue; - boolean hasDeadline = - (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation); + this.duration = MILLISECONDS.convert(jobDef.duration.getInt(), + SECONDS); - LogNormalDistribution deadlineFactor = - SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg, - jobClass.jobClass.deadline_factor_stddev); + boolean hasDeadline = + (rand.nextDouble() <= jobDef.reservation.getDouble()); - double deadlineFactorSample = - (deadlineFactor != null) ? deadlineFactor.sample() : -1; + double deadlineFactorSample = jobDef.deadline_factor.getDouble(); - this.queueName = jobClass.workload.getQueueName(); + this.type = jobDef.type; this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS); @@ -129,6 +92,8 @@ public SynthJob(JDKRandomGenerator rand, Configuration conf, hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS) + (long) Math.ceil(deadlineFactorSample * duration) : -1; + this.params = jobDef.params; + conf.set(QUEUE_NAME, queueName); // name and initialize job randomness @@ -136,141 +101,87 @@ public SynthJob(JDKRandomGenerator rand, Configuration conf, rand.setSeed(seed); id = sequence.getAndIncrement(); - name = String.format(jobClass.getClassName() + "_%06d", id); + name = String.format(jobDef.class_name + "_%06d", id); LOG.debug(name + " (" + seed + ")"); LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime + " deadline:" + deadline + " duration:" + duration + " deadline-submission: " + (deadline - submitTime)); - // generate map and reduce runtimes - mapRuntime = new long[numMapTasks]; - for (int i = 0; i < numMapTasks; i++) { - mapRuntime[i] = jobClass.getMapTimeSample(); - totMapRuntime += mapRuntime[i]; - } - reduceRuntime = new float[numRedTasks]; - for (int i = 0; i < numRedTasks; i++) { - reduceRuntime[i] = jobClass.getReduceTimeSample(); - totRedRuntime += (long) Math.ceil(reduceRuntime[i]); + // Expand tasks + for(SynthTraceJobProducer.TaskDefinition task : jobDef.tasks){ + int num = task.count.getInt(); + String type = task.type; + long memory = task.max_memory.getLong(); + memory = memory < MIN_MEMORY ? MIN_MEMORY: memory; + long vcores = task.max_vcores.getLong(); + vcores = vcores < MIN_VCORES ? MIN_VCORES : vcores; + int priority = task.priority; + + for(int i = 0; i < num; ++i){ + long time = task.time.getLong(); + totalSlotTime += time; + tasks.add(new SynthTask(type, time, memory, vcores, + priority)); + } } + + } + + public String getType(){ + return type; + } + + public List getTasks(){ + return tasks; } public boolean hasDeadline() { return deadline > 0; } - @Override public String getName() { return name; } - @Override public String getUser() { - return jobClass.getUserName(); + return jobDef.user_name; } - @Override public JobID getJobID() { return new JobID("job_mock_" + name, id); } - @Override - public Values getOutcome() { - return Values.SUCCESS; - } - - @Override public long getSubmissionTime() { return submitTime; } - @Override - public int getNumberMaps() { - return numMapTasks; - } - - @Override - public int getNumberReduces() { - return numRedTasks; - } - - @Override - public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { - switch (taskType) { - case MAP: - return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); - case REDUCE: - return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); - default: - throw new IllegalArgumentException("Not interested"); - } - } - - @Override - public InputSplit[] getInputSplits() { - throw new UnsupportedOperationException(); - } - - @Override - public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, - int taskAttemptNumber) { - switch (taskType) { - case MAP: - return new MapTaskAttemptInfo(State.SUCCEEDED, - getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null); - - case REDUCE: - // We assume uniform split between pull/sort/reduce - // aligned with naive progress reporting assumptions - return new ReduceTaskAttemptInfo(State.SUCCEEDED, - getTaskInfo(taskType, taskNumber), - (long) Math.round((reduceRuntime[taskNumber] / 3)), - (long) Math.round((reduceRuntime[taskNumber] / 3)), - (long) Math.round((reduceRuntime[taskNumber] / 3)), null); - - default: - break; - } - throw new UnsupportedOperationException(); - } - - @Override - public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, - int taskAttemptNumber, int locality) { - throw new UnsupportedOperationException(); - } - - @Override - public org.apache.hadoop.mapred.JobConf getJobConf() { - return new JobConf(conf); - } - - @Override public String getQueueName() { return queueName; } @Override public String toString() { - return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId() - + "\n" + " jobClass=" - + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n" - + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name - + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n" - + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n" - + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks - + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory=" - + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n" - + " queueName=" + queueName + "\n" + "]"; - } - - public SynthJobClass getJobClass() { - return jobClass; + String res = "\nSynthJob [" + jobDef.class_name + "]: \n" + + "\tname: " + getName() + "\n" + + "\ttype: " + getType() + "\n" + + "\tid: " + id + "\n" + + "\tqueue: " + getQueueName() + "\n" + + "\tsubmission: " + getSubmissionTime() + "\n" + + "\tduration: " + getDuration() + "\n" + + "\tdeadline: " + getDeadline() + "\n"; + int taskno = 0; + for(SynthJob.SynthTask t : getTasks()){ + res += String.format("\t\ttask[%1$2s" + + "]\ttype: %2$-10s\ttime: %3$3s\tmemory: %4$4s\tvcores: %5$2s\n", + taskno, t.getType(), t.getTime(), t.getMemory(), t.getVcores()); + taskno++; + } + return res; } public long getTotalSlotTime() { - return totMapRuntime + totRedRuntime; + return totalSlotTime; } public long getDuration() { @@ -281,26 +192,77 @@ public long getDeadline() { return deadline; } + public Map getParams() { + return params; + } + @Override public boolean equals(Object other) { if (!(other instanceof SynthJob)) { return false; } SynthJob o = (SynthJob) other; - return Arrays.equals(mapRuntime, o.mapRuntime) - && Arrays.equals(reduceRuntime, o.reduceRuntime) - && submitTime == o.submitTime && numMapTasks == o.numMapTasks - && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory - && reduceMaxMemory == o.reduceMaxMemory - && mapMaxVcores == o.mapMaxVcores - && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName) - && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime - && totRedRuntime == o.totRedRuntime; + return tasks.equals( o.tasks) + && submitTime == o.submitTime + && type.equals(o.type) + && queueName.equals(o.queueName) + && jobDef.class_name.equals(o.jobDef.class_name); } @Override public int hashCode() { - // could have a bad distr; investigate if a relevant use case exists - return jobClass.hashCode() * (int) submitTime; + return jobDef.class_name.hashCode() + * (int) submitTime * (int) duration; + } + + public static class SynthTask{ + private String type; + private long time; + private long max_memory; + private long max_vcores; + private int priority; + + private SynthTask(String type, long time, long max_memory, long max_vcores, + int priority){ + this.type = type; + this.time = time; + this.max_memory = max_memory; + this.max_vcores = max_vcores; + this.priority = priority; + } + + public String getType(){ + return type; + } + + public long getTime(){ + return time; + } + + public long getMemory(){ + return max_memory; + } + + public long getVcores(){ + return max_vcores; + } + + public int getPriority(){ + return priority; + } + + @Override + public boolean equals(Object other){ + if (!(other instanceof SynthTask)) { + return false; + } + SynthTask o = (SynthTask) other; + return type.equals(o.type) + && time == o.time + && max_memory == o.max_memory + && max_vcores == o.max_vcores + && priority == o.priority; + } + } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java deleted file mode 100644 index 439698f8a45..00000000000 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.synthetic; - -import org.apache.commons.math3.distribution.AbstractRealDistribution; -import org.apache.commons.math3.distribution.LogNormalDistribution; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.tools.rumen.JobStory; -import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; -import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; - -/** - * This is a class that represent a class of Jobs. It is used to generate an - * individual job, by picking random durations, task counts, container size, - * etc. - */ -public class SynthJobClass { - - private final JDKRandomGenerator rand; - private final LogNormalDistribution dur; - private final LogNormalDistribution mapRuntime; - private final LogNormalDistribution redRuntime; - private final LogNormalDistribution mtasks; - private final LogNormalDistribution rtasks; - private final LogNormalDistribution mapMem; - private final LogNormalDistribution redMem; - private final LogNormalDistribution mapVcores; - private final LogNormalDistribution redVcores; - - private final Trace trace; - @SuppressWarnings("VisibilityModifier") - protected final SynthWorkload workload; - @SuppressWarnings("VisibilityModifier") - protected final JobClass jobClass; - - public SynthJobClass(JDKRandomGenerator rand, Trace trace, - SynthWorkload workload, int classId) { - - this.trace = trace; - this.workload = workload; - this.rand = new JDKRandomGenerator(); - this.rand.setSeed(rand.nextLong()); - jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId); - - this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg, - jobClass.dur_stddev); - this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg, - jobClass.mtime_stddev); - this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg, - jobClass.rtime_stddev); - this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg, - jobClass.mtasks_stddev); - this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg, - jobClass.rtasks_stddev); - - this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg, - jobClass.map_max_memory_stddev); - this.redMem = SynthUtils.getLogNormalDist(rand, - jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev); - this.mapVcores = SynthUtils.getLogNormalDist(rand, - jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); - this.redVcores = SynthUtils.getLogNormalDist(rand, - jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); - } - - public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { - return new SynthJob(rand, conf, this, actualSubmissionTime); - } - - @Override - public String toString() { - return "SynthJobClass [workload=" + workload.getName() + ", class=" - + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur=" - + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime=" - + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0) - + ", redRuntime=" - + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0) - + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0) - + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0) - + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n"; - - } - - public double getClassWeight() { - return jobClass.class_weight; - } - - public long getDur() { - return genLongSample(dur); - } - - public int getMtasks() { - return genIntSample(mtasks); - } - - public int getRtasks() { - return genIntSample(rtasks); - } - - public long getMapMaxMemory() { - return genLongSample(mapMem); - } - - public long getReduceMaxMemory() { - return genLongSample(redMem); - } - - public long getMapMaxVcores() { - return genLongSample(mapVcores); - } - - public long getReduceMaxVcores() { - return genLongSample(redVcores); - } - - public SynthWorkload getWorkload() { - return workload; - } - - public int genIntSample(AbstractRealDistribution dist) { - if (dist == null) { - return 0; - } - double baseSample = dist.sample(); - if (baseSample < 0) { - baseSample = 0; - } - return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample)); - } - - public long genLongSample(AbstractRealDistribution dist) { - return dist != null ? (long) Math.ceil(dist.sample()) : 0; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SynthJobClass)) { - return false; - } - SynthJobClass o = (SynthJobClass) other; - return workload.equals(o.workload); - } - - @Override - public int hashCode() { - return workload.hashCode() * workload.getId(); - } - - public String getClassName() { - return jobClass.class_name; - } - - public long getMapTimeSample() { - return genLongSample(mapRuntime); - } - - public long getReduceTimeSample() { - return genLongSample(redRuntime); - } - - public String getUserName() { - return jobClass.user_name; - } -} diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index c89e4e26a5b..f91f761e550 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -19,14 +19,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.distribution.AbstractRealDistribution; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.tools.rumen.JobStory; -import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import javax.xml.bind.annotation.XmlRootElement; @@ -39,12 +41,12 @@ /** * This is a JobStoryProducer that operates from distribution of different - * workloads. The .json input file is used to determine how many jobs, which + * workloads. The .json input file is used to determine how many weight, which * size, number of maps/reducers and their duration, as well as the temporal * distributed of submissions. For each parameter we control avg and stdev, and * generate values via normal or log-normal distributions. */ -public class SynthTraceJobProducer implements JobStoryProducer { +public class SynthTraceJobProducer { @SuppressWarnings("StaticVariableName") private static final Log LOG = LogFactory.getLog(SynthTraceJobProducer.class); @@ -55,8 +57,6 @@ private final long seed; private int totalWeight; - private final List weightList; - private final Map workloads; private final Queue listStoryParams; @@ -65,6 +65,9 @@ public static final String SLS_SYNTHETIC_TRACE_FILE = "sls.synthetic" + ".trace_file"; + private final static int DEFAULT_MAPPER_PRIORITY = 20; + private final static int DEFAULT_REDUCER_PRIORITY = 10; + public SynthTraceJobProducer(Configuration conf) throws IOException { this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE))); } @@ -76,8 +79,6 @@ public SynthTraceJobProducer(Configuration conf, Path path) this.conf = conf; this.rand = new JDKRandomGenerator(); - workloads = new HashMap(); - weightList = new ArrayList(); ObjectMapper mapper = new ObjectMapper(); mapper.configure(INTERN_FIELD_NAMES, true); @@ -86,44 +87,127 @@ public SynthTraceJobProducer(Configuration conf, Path path) FileSystem ifs = path.getFileSystem(conf); FSDataInputStream fileIn = ifs.open(path); + // Initialize the random generator and the seed this.trace = mapper.readValue(fileIn, Trace.class); - seed = trace.rand_seed; - rand.setSeed(seed); + this.seed = trace.rand_seed; + this.rand.setSeed(seed); + // Initialize the trace + this.trace.init(rand); this.numJobs = new AtomicInteger(trace.num_jobs); - for (int workloadId = 0; workloadId < trace.workloads - .size(); workloadId++) { - SynthWorkload workload = new SynthWorkload(workloadId, trace); - for (int classId = - 0; classId < trace.workloads.get(workloadId).job_classes - .size(); classId++) { - SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId); - workload.add(cls); - } - workloads.put(workloadId, workload); + for (Double w : trace.workload_weights) { + totalWeight += w; } - for (int i = 0; i < workloads.size(); i++) { - double w = workloads.get(i).getWorkloadWeight(); - totalWeight += w; - weightList.add(w); + // Initialize our story parameters + listStoryParams = createStory(); + + LOG.info("Generated " + listStoryParams.size() + " deadlines for " + + this.numJobs.get() + " jobs"); + } + + // StoryParams hold the minimum amount of information needed to completely + // specify a job run: job definition, start time, and queue. + // This allows us to create "jobs" and then order them according to start time + static class StoryParams { + // Time the job gets submitted to + private long actualSubmissionTime; + // The queue the job gets submitted to + private String queue; + // Definition to construct the job from + private JobDefinition jobDef; + + StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) { + this.actualSubmissionTime = actualSubmissionTime; + this.queue = queue; + this.jobDef = jobDef; } + } + + private Queue createStory() { // create priority queue to keep start-time sorted - listStoryParams = - new PriorityQueue(10, new Comparator() { + Queue storyQueue = + new PriorityQueue<>(this.numJobs.get(), new Comparator() { @Override public int compare(StoryParams o1, StoryParams o2) { return Math - .toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime); + .toIntExact(o1.actualSubmissionTime - o2.actualSubmissionTime); } }); + for (int i = 0; i < numJobs.get(); i++) { + // Generate a workload + Workload wl = trace.generateWorkload(); + // Save all the parameters needed to completely define a job + long actualSubmissionTime = wl.generateSubmissionTime(); + String queue = wl.queue_name; + JobDefinition job = wl.generateJobDefinition(); + storyQueue.add(new StoryParams(actualSubmissionTime, queue, job)); + } + return storyQueue; + } - // initialize it - createStoryParams(); - LOG.info("Generated " + listStoryParams.size() + " deadlines for " - + this.numJobs.get() + " jobs "); + public SynthJob getNextJob() throws IOException { + if (numJobs.decrementAndGet() < 0) { + return null; + } + StoryParams storyParams = listStoryParams.poll(); + return new SynthJob(rand, conf, storyParams.jobDef, storyParams.queue, + storyParams.actualSubmissionTime); + } + + @Override + public String toString() { + return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs + + ", r=" + rand + ", totalWeight=" + + totalWeight + ", workloads=" + trace.workloads + "]"; + } + + public int getNumJobs() { + return trace.num_jobs; + } + + // Helper to parse and maintain backwards compatibility with + // syn json formats + private static void validateJobDef(JobDefinition jobDef){ + if(jobDef.tasks == null) { + LOG.info("Detected old JobDefinition format. Converting."); + try { + jobDef.tasks = new ArrayList<>(); + jobDef.type = "mapreduce"; + jobDef.deadline_factor = new Sample(jobDef.deadline_factor_avg, + jobDef.deadline_factor_stddev); + jobDef.duration = new Sample(jobDef.dur_avg, + jobDef.dur_stddev); + jobDef.reservation = new Sample(jobDef.chance_of_reservation); + + TaskDefinition map = new TaskDefinition(); + map.type = "map"; + map.count = new Sample(jobDef.mtasks_avg, jobDef.mtasks_stddev); + map.time = new Sample(jobDef.mtime_avg, jobDef.mtime_stddev); + map.max_memory = new Sample((double) jobDef.map_max_memory_avg, + jobDef.map_max_memory_stddev); + map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg, + jobDef.map_max_vcores_stddev); + map.priority = DEFAULT_MAPPER_PRIORITY; + + jobDef.tasks.add(map); + TaskDefinition reduce = new TaskDefinition(); + reduce.type = "reduce"; + reduce.count = new Sample(jobDef.rtasks_avg, jobDef.rtasks_stddev); + reduce.time = new Sample(jobDef.rtime_avg, jobDef.rtime_stddev); + reduce.max_memory = new Sample((double) jobDef.reduce_max_memory_avg, + jobDef.reduce_max_memory_stddev); + reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg, + jobDef.reduce_max_vcores_stddev); + reduce.priority = DEFAULT_REDUCER_PRIORITY; + + jobDef.tasks.add(reduce); + } catch (JsonMappingException e) { + LOG.warn("Error converting old JobDefinition format", e); + } + } } public long getSeed() { @@ -159,6 +243,25 @@ public int getNumNodes() { @JsonProperty("workloads") List workloads; + List workload_weights; + JDKRandomGenerator rand; + + public void init(JDKRandomGenerator rand){ + this.rand = rand; + // Pass rand forward + for(Workload w : workloads){ + w.init(rand); + } + // Initialize workload weights + workload_weights = new ArrayList<>(); + for(Workload w : workloads){ + workload_weights.add(w.workload_weight); + } + } + + Workload generateWorkload(){ + return workloads.get(SynthUtils.getWeighted(workload_weights, rand)); + } } /** @@ -174,16 +277,67 @@ public int getNumNodes() { @JsonProperty("queue_name") String queue_name; @JsonProperty("job_classes") - List job_classes; + List job_classes; @JsonProperty("time_distribution") List time_distribution; + + JDKRandomGenerator rand; + + List job_weights; + List time_weights; + + public void init(JDKRandomGenerator rand){ + this.rand = rand; + // Validate and pass rand forward + for(JobDefinition def : job_classes){ + validateJobDef(def); + def.init(rand); + } + + // Initialize job weights + job_weights = new ArrayList<>(); + job_weights = new ArrayList<>(); + for(JobDefinition j : job_classes){ + job_weights.add(j.class_weight); + } + + // Initialize time weights + time_weights = new ArrayList<>(); + for(TimeSample ts : time_distribution){ + time_weights.add(ts.weight); + } + } + + public long generateSubmissionTime(){ + int index = SynthUtils.getWeighted(time_weights, rand); + // Retrieve the lower and upper bounds for this time "bucket" + int start = time_distribution.get(index).time; + // Get the beginning of the next time sample (if it exists) + index = (index+1)0 ? rand.nextInt(range) : 0); + } + + public JobDefinition generateJobDefinition(){ + return job_classes.get(SynthUtils.getWeighted(job_weights, rand)); + } + + @Override + public String toString(){ + return "\nWorkload " + workload_name + ", weight: " + workload_weight + + ", queue: " + queue_name + " " + + job_classes.toString().replace("\n", "\n\t"); + } } /** * Class used to parse a job class from file. */ @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) - public static class JobClass { + public static class JobDefinition { @JsonProperty("class_name") String class_name; @@ -194,6 +348,23 @@ public int getNumNodes() { @JsonProperty("class_weight") double class_weight; + // am type to launch + @JsonProperty("type") + String type; + @JsonProperty("deadline_factor") + Sample deadline_factor; + @JsonProperty("duration") + Sample duration; + @JsonProperty("reservation") + Sample reservation; + + @JsonProperty("tasks") + List tasks; + + @JsonProperty("params") + Map params; + + // Old JSON fields for backwards compatibility // reservation related params @JsonProperty("chance_of_reservation") double chance_of_reservation; @@ -246,71 +417,224 @@ public int getNumNodes() { @JsonProperty("reduce_max_vcores_stddev") double reduce_max_vcores_stddev; + public void init(JDKRandomGenerator rand){ + deadline_factor.init(rand); + duration.init(rand); + reservation.init(rand); + + for(TaskDefinition t : tasks){ + t.count.init(rand); + t.time.init(rand); + t.max_memory.init(rand); + t.max_vcores.init(rand); + } + } + + @Override + public String toString(){ + return "\nJobDefinition " + class_name + ", weight: " + class_weight + + ", type: " + type + " " + + tasks.toString().replace("\n", "\n\t"); + } } /** - * This is used to define time-varying probability of a job start-time (e.g., - * to simulate daily patterns). + * A task representing a type of container - e.g. "map" in mapreduce */ @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) - public static class TimeSample { - // in sec + public static class TaskDefinition { + + @JsonProperty("type") + String type; + @JsonProperty("count") + Sample count; @JsonProperty("time") - int time; - @JsonProperty("weight") - double jobs; + Sample time; + @JsonProperty("max_memory") + Sample max_memory; + @JsonProperty("max_vcores") + Sample max_vcores; + @JsonProperty("priority") + int priority; + + @Override + public String toString(){ + return "\nTaskDefinition " + type + + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory + + "] Vcores[" + max_vcores + "] Priority[" + priority + "]"; + } } - static class StoryParams { - private SynthJobClass pickedJobClass; - private long actualSubmissionTime; + /** + * Class used to parse value sample information + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class Sample { + private static final Dist DEFAULT_DIST = Dist.LOGNORM; + + private final double val; + private final double std; + private final Dist dist; + private AbstractRealDistribution dist_instance; + private final List discrete; + private final List weights; + private final Mode mode; + + private Random rand; + + private enum Mode{ + CONST, + DIST, + DISC + } - StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) { - this.pickedJobClass = pickedJobClass; - this.actualSubmissionTime = actualSubmissionTime; + private enum Dist{ + LOGNORM, + NORM } - } + public Sample(Double val) throws JsonMappingException{ + this(val, null); + } - void createStoryParams() { + public Sample(Double val, Double std) throws JsonMappingException{ + this(val, std, null, null , null); + } - for (int i = 0; i < numJobs.get(); i++) { - int workload = SynthUtils.getWeighted(weightList, rand); - SynthWorkload pickedWorkload = workloads.get(workload); - long jobClass = - SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand); - SynthJobClass pickedJobClass = - pickedWorkload.getClassList().get((int) jobClass); - long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand); - // long actualSubmissionTime = (i + 1) * 10; - listStoryParams - .add(new StoryParams(pickedJobClass, actualSubmissionTime)); + @JsonCreator + public Sample(@JsonProperty("val") Double val, + @JsonProperty("std") Double std, @JsonProperty("dist") String dist, + @JsonProperty("discrete") List discrete, + @JsonProperty("weights") List weights) + throws JsonMappingException{ + // Different Modes + // - Constant: val must be specified, all else null. Sampling will + // return val. + // - Distribution: val, std specified, dist optional (defaults to + // LogNormal). Sampling will sample from the appropriate distribution + // - Discrete: discrete must be set to a list of strings or numbers, + // weights optional (defaults to uniform) + + if(val!=null){ + if(std==null){ + // Constant + if(dist!=null || discrete!=null || weights!=null){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + mode = Mode.CONST; + this.val = val; + this.std = 0; + this.dist = null; + this.discrete = null; + this.weights = null; + } else { + // Distribution + if(discrete!=null || weights != null){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + mode = Mode.DIST; + this.val = val; + this.std = std; + this.dist = dist!=null ? Dist.valueOf(dist) : DEFAULT_DIST; + this.discrete = null; + this.weights = null; + } + } else { + // Discrete + if(discrete==null){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + mode = Mode.DISC; + this.val = 0; + this.std = 0; + this.dist = null; + this.discrete = discrete; + if(weights == null){ + weights = new ArrayList<>(Collections.nCopies( + discrete.size(), 1.0)); + } + if(weights.size() != discrete.size()){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + this.weights = weights; + } } - } - @Override - public JobStory getNextJob() throws IOException { - if (numJobs.decrementAndGet() < 0) { - return null; + public void init(JDKRandomGenerator rand){ + if(this.rand != null){ + throw new YarnRuntimeException("init called twice"); + } + this.rand = rand; + if(mode == Mode.DIST){ + switch(this.dist){ + case LOGNORM: + this.dist_instance = SynthUtils.getLogNormalDist(rand, val, std); + return; + case NORM: + this.dist_instance = SynthUtils.getNormalDist(rand, val, std); + return; + } + throw new YarnRuntimeException("Unknown distribution " + dist.name()); + } } - StoryParams storyParams = listStoryParams.poll(); - return storyParams.pickedJobClass.getJobStory(conf, - storyParams.actualSubmissionTime); - } - @Override - public void close() { - } + public int getInt(){ + return Math.toIntExact(getLong()); + } - @Override - public String toString() { - return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs - + ", weightList=" + weightList + ", r=" + rand + ", totalWeight=" - + totalWeight + ", workloads=" + workloads + "]"; - } + public long getLong(){ + return Math.round(getDouble()); + } + + public double getDouble(){ + return Double.parseDouble(getString()); + } + + public String getString(){ + if(this.rand == null){ + throw new YarnRuntimeException("getValue called without init"); + } + switch(mode){ + case CONST: + return Double.toString(val); + case DIST: + return Double.toString(dist_instance.sample()); + case DISC: + return this.discrete.get(SynthUtils.getWeighted(this.weights, rand)); + } + throw new YarnRuntimeException("Unknown sampling mode " + mode.name()); + } + + @Override + public String toString(){ + switch(mode){ + case CONST: + return "value: " + Double.toString(val); + case DIST: + return "value: " + this.val + " std: " + this.std + " dist: " + + this.dist.name(); + case DISC: + return "discrete: " + this.discrete + ", weights: " + this.weights; + } + throw new YarnRuntimeException("Unknown sampling mode " + mode.name()); + } - public int getNumJobs() { - return trace.num_jobs; } + /** + * This is used to define time-varying probability of a job start-time (e.g., + * to simulate daily patterns). + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class TimeSample { + // in sec + @JsonProperty("time") + int time; + @JsonProperty("weight") + double weight; + } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java deleted file mode 100644 index 9e5fd4ef742..00000000000 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.synthetic; - -import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; - -import java.util.*; - -/** - * This class represent a workload (made up of multiple SynthJobClass(es)). It - * also stores the temporal distributions of jobs in this workload. - */ -public class SynthWorkload { - - private final int id; - private final List classList; - private final Trace trace; - private final SortedMap timeWeights; - - public SynthWorkload(int identifier, Trace inTrace) { - classList = new ArrayList(); - this.id = identifier; - this.trace = inTrace; - timeWeights = new TreeMap(); - for (SynthTraceJobProducer.TimeSample ts : trace.workloads - .get(id).time_distribution) { - timeWeights.put(ts.time, ts.jobs); - } - } - - public boolean add(SynthJobClass s) { - return classList.add(s); - } - - public List getWeightList() { - ArrayList ret = new ArrayList(); - for (SynthJobClass s : classList) { - ret.add(s.getClassWeight()); - } - return ret; - } - - public int getId() { - return id; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SynthWorkload)) { - return false; - } - // assume ID determines job classes by construction - return getId() == ((SynthWorkload) other).getId(); - } - - @Override - public int hashCode() { - return getId(); - } - - @Override - public String toString() { - return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n" - + classList + "]\n"; - } - - public String getName() { - return trace.workloads.get(id).workload_name; - } - - public double getWorkloadWeight() { - return trace.workloads.get(id).workload_weight; - } - - public String getQueueName() { - return trace.workloads.get(id).queue_name; - } - - public long getBaseSubmissionTime(Random rand) { - - // pick based on weights the "bucket" for this start time - int position = SynthUtils.getWeighted(timeWeights.values(), rand); - - int[] time = new int[timeWeights.keySet().size()]; - int index = 0; - for (Integer i : timeWeights.keySet()) { - time[index++] = i; - } - - // uniformly pick a time between start and end time of this bucket - int startRange = time[position]; - int endRange = startRange; - // if there is no subsequent bucket pick startRange - if (position < timeWeights.keySet().size() - 1) { - endRange = time[position + 1]; - return startRange + rand.nextInt((endRange - startRange)); - } else { - return startRange; - } - } - - public List getClassList() { - return classList; - } - -} diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java index 6b369f2a6f0..668be145d70 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java @@ -125,7 +125,7 @@ public void uncaughtException(Thread t, Throwable e) { if (!exceptionList.isEmpty()) { sls.stop(); Assert.fail("TestSLSRunner catched exception from child thread " - + "(TaskRunner.Task): " + exceptionList); + + "(TaskRunner.TaskDefinition): " + exceptionList); break; } timeout--; diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java new file mode 100644 index 00000000000..47e2d1e6136 --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +/** + * This test performs simple runs of the SLS with the generic syn json format + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSGenericSynth extends BaseSLSRunnerTest { + + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { + + String capScheduler = CapacityScheduler.class.getCanonicalName(); + String fairScheduler = FairScheduler.class.getCanonicalName(); + String synthTraceFile = "src/test/resources/syn_generic.json"; + String nodeFile = "src/test/resources/nodes.json"; + + // Test with both schedulers + return Arrays.asList(new Object[][] { + + // covering the no nodeFile case + {capScheduler, "SYNTH", synthTraceFile, null }, + + // covering new commandline and CapacityScheduler + {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + + // covering FairScheduler + {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + }); + } + + @Before + public void setup() { + ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt"; + exitInvariantFile = "src/test/resources/exit-invariants.txt"; + } + + @Test(timeout = 90000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); + } +} diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java index 567f0d9d3b8..abb3b5e904a 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java @@ -78,7 +78,7 @@ public void setup() { exitInvariantFile = "src/test/resources/exit-invariants.txt"; } - @Test(timeout = 60000) + @Test(timeout = 90000) @SuppressWarnings("all") public void testSimulatorRunning() throws Exception { Configuration conf = new Configuration(false); diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java new file mode 100644 index 00000000000..4947b156e83 --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +/** + * This test performs simple runs of the SLS with the generic syn json format + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSStreamAMSynth extends BaseSLSRunnerTest { + + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { + + String capScheduler = CapacityScheduler.class.getCanonicalName(); + String fairScheduler = FairScheduler.class.getCanonicalName(); + String synthTraceFile = "src/test/resources/syn_stream.json"; + String nodeFile = "src/test/resources/nodes.json"; + + // Test with both schedulers + return Arrays.asList(new Object[][] { + + // covering the no nodeFile case + {capScheduler, "SYNTH", synthTraceFile, null }, + + // covering new commandline and CapacityScheduler + {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + + // covering FairScheduler + {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + }); + } + + @Before + public void setup() { + ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt"; + exitInvariantFile = "src/test/resources/exit-invariants.txt"; + } + + @Test(timeout = 90000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); + } +} diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java index 2b1971a8ec5..0055c5e7e17 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java @@ -17,31 +17,87 @@ */ package org.apache.hadoop.yarn.sls; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.yarn.sls.synthetic.SynthJob; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import static org.junit.Assert.assertTrue; +import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES; +import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES; + /** * Simple test class driving the {@code SynthTraceJobProducer}, and validating * jobs produce are within expected range. */ public class TestSynthJobGeneration { - public final static Logger LOG = - Logger.getLogger(TestSynthJobGeneration.class); + public final static Logger LOG + = LoggerFactory.getLogger(TestSynthJobGeneration.class); + + @Test + public void testWorkloadGenerateTime() throws IllegalArgumentException, IOException { + + String workload_json = "{\"job_classes\": [], \"time_distribution\":[" + + "{\"time\": 0, \"weight\": 1}, " + + "{\"time\": 30, \"weight\": 0}," + + "{\"time\": 60, \"weight\": 2}," + + "{\"time\": 90, \"weight\": 1}" + + "]}"; + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + SynthTraceJobProducer.Workload wl = mapper.readValue(workload_json, + SynthTraceJobProducer.Workload.class); + + JDKRandomGenerator rand = new JDKRandomGenerator(); + rand.setSeed(0); + + wl.init(rand); + + int bucket_0 = 0; + int bucket_1 = 0; + int bucket_2 = 0; + int bucket_3 = 0; + for(int i = 0; i < 1000; ++i) + { + long time = wl.generateSubmissionTime(); + LOG.info("Generated time " + time); + if(time < 30){ + bucket_0++; + } else if (time < 60) { + bucket_1++; + } else if (time < 90) { + bucket_2++; + } else { + bucket_3++; + } + } + + Assert.assertTrue(bucket_0 > 0); + Assert.assertTrue(bucket_1 == 0); + Assert.assertTrue(bucket_2 > 0); + Assert.assertTrue(bucket_3 > 0); + Assert.assertTrue(bucket_2 > bucket_0); + Assert.assertTrue(bucket_2 > bucket_3); + + LOG.info("bucket_0 {}, bucket_1 {}, bucket_2 {}, bucket_3 {}", bucket_0, bucket_1, bucket_2, bucket_3); + + } @Test - public void test() throws IllegalArgumentException, IOException { + public void testMapReduce() throws IllegalArgumentException, IOException { Configuration conf = new Configuration(); @@ -50,47 +106,155 @@ public void test() throws IllegalArgumentException, IOException { SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); - SynthJob js = (SynthJob) stjp.getNextJob(); + LOG.info(stjp.toString()); + + SynthJob js = stjp.getNextJob(); int jobCount = 0; while (js != null) { - LOG.info((jobCount++) + " " + js.getQueueName() + " -- " - + js.getJobClass().getClassName() + " (conf: " - + js.getJobConf().get(MRJobConfig.QUEUE_NAME) + ") " + " submission: " - + js.getSubmissionTime() + ", " + " duration: " + js.getDuration() - + " numMaps: " + js.getNumberMaps() + " numReduces: " - + js.getNumberReduces()); + LOG.info(js.toString()); + validateJob(js); + js = stjp.getNextJob(); + jobCount++; + } + Assert.assertEquals(stjp.getNumJobs(), jobCount); + } + + @Test + public void testGeneric()throws IllegalArgumentException, IOException { + Configuration conf = new Configuration(); + + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn_generic.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + LOG.info(stjp.toString()); + + SynthJob js = stjp.getNextJob(); + + int jobCount = 0; + + while (js != null) { + LOG.info(js.toString()); validateJob(js); - js = (SynthJob) stjp.getNextJob(); + js = stjp.getNextJob(); + jobCount++; } Assert.assertEquals(stjp.getNumJobs(), jobCount); } - private void validateJob(SynthJob js) { + @Test + public void testStream()throws IllegalArgumentException, IOException { + Configuration conf = new Configuration(); - assertTrue(js.getSubmissionTime() > 0); - assertTrue(js.getDuration() > 0); - assertTrue(js.getNumberMaps() >= 0); - assertTrue(js.getNumberReduces() >= 0); - assertTrue(js.getNumberMaps() + js.getNumberReduces() > 0); - assertTrue(js.getTotalSlotTime() >= 0); + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn_stream.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + LOG.info(stjp.toString()); + + SynthJob js = stjp.getNextJob(); + + int jobCount = 0; + + while (js != null) { + LOG.info(js.toString()); + validateJob(js); + js = stjp.getNextJob(); + jobCount++; + } + + Assert.assertEquals(stjp.getNumJobs(), jobCount); + } + + @Test + public void testSample() throws IOException{ + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + JDKRandomGenerator rand = new JDKRandomGenerator(); + rand.setSeed(0); + + String val_json = "{\"val\" : 5 }"; + SynthTraceJobProducer.Sample val_sample = + mapper.readValue(val_json, SynthTraceJobProducer.Sample.class); + val_sample.init(rand); + int val = val_sample.getInt(); + Assert.assertEquals(5, val); + + String dist_json = "{\"val\" : 5, \"std\" : 1 }"; + SynthTraceJobProducer.Sample dist_sample = + mapper.readValue(dist_json, SynthTraceJobProducer.Sample.class); + dist_sample.init(rand); + double dist = dist_sample.getDouble(); + Assert.assertTrue(dist > 2 && dist < 8); - for (int i = 0; i < js.getNumberMaps(); i++) { - TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.MAP, i, 0); - assertTrue(tai.getRuntime() > 0); + String normdist_json = "{\"val\" : 5, \"std\" : 1, \"dist\": \"NORM\" }"; + SynthTraceJobProducer.Sample normdist_sample = + mapper.readValue(normdist_json, SynthTraceJobProducer.Sample.class); + normdist_sample.init(rand); + double normdist = normdist_sample.getDouble(); + Assert.assertTrue(normdist > 2 && normdist < 8); + + String discrete_json = "{\"discrete\" : [2, 4, 6, 8]}"; + SynthTraceJobProducer.Sample discrete_sample = + mapper.readValue(discrete_json, SynthTraceJobProducer.Sample.class); + discrete_sample.init(rand); + int discrete = discrete_sample.getInt(); + Assert.assertTrue(Arrays.asList(new Integer[]{2, 4, 6, 8}) + .contains(discrete)); + + String discrete_weights_json = "{\"discrete\" : [2, 4, 6, 8], " + + "\"weights\": [0, 0, 0, 1]}"; + SynthTraceJobProducer.Sample discrete_weights_sample = + mapper.readValue(discrete_weights_json, SynthTraceJobProducer.Sample.class); + discrete_weights_sample.init(rand); + int discrete_weights = discrete_weights_sample.getInt(); + Assert.assertEquals(8, discrete_weights); + + String invalid_json = "{\"val\" : 5, \"discrete\" : [2, 4, 6, 8], " + + "\"weights\": [0, 0, 0, 1]}"; + try{ + mapper.readValue(invalid_json, SynthTraceJobProducer.Sample.class); + Assert.fail(); + } catch (JsonMappingException e) { + Assert.assertTrue( e.getMessage().startsWith("Instantiation of")); } - for (int i = 0; i < js.getNumberReduces(); i++) { - TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.REDUCE, i, 0); - assertTrue(tai.getRuntime() > 0); + String invalid_dist_json = "{\"val\" : 5, \"std\" : 1, " + + "\"dist\": \"INVALID\" }"; + try{ + mapper.readValue(invalid_dist_json, SynthTraceJobProducer.Sample.class); + Assert.fail(); + } catch (JsonMappingException e) { + Assert.assertTrue( e.getMessage().startsWith("Instantiation of")); } + } + + private void validateJob(SynthJob js) { + + assertTrue(js.getSubmissionTime() > 0); + assertTrue(js.getDuration() > 0); + assertTrue(js.getTotalSlotTime() >= 0); if (js.hasDeadline()) { assertTrue(js.getDeadline() > js.getSubmissionTime() + js.getDuration()); } + assertTrue(js.getTasks().size() > 0); + + for(SynthJob.SynthTask t : js.getTasks()){ + assertTrue(t.getType() != null); + assertTrue(t.getTime() > 0); + assertTrue(t.getMemory() > 0); + assertTrue(t.getVcores() > 0); + } } } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 02dc26eeaf9..bfc7d0c6c31 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -19,6 +19,7 @@ import com.codahale.metrics.MetricRegistry; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -89,6 +90,10 @@ protected void sendContainerRequest() throws YarnException, IOException, InterruptedException { } + @Override + public void initReservation(ReservationId id, long deadline, long now){ + } + @Override protected void checkStop() { } @@ -134,7 +139,7 @@ public void testAMSimulator() throws Exception { String queue = "default"; List containers = new ArrayList<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, null, 0, SLSConfiguration.getAMContainerResource(conf)); + appId, 0, SLSConfiguration.getAMContainerResource(conf), null); app.firstStep(); verifySchedulerMetrics(appId); diff --git hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml index 2f076c27ab0..344024a8dc7 100644 --- hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml +++ hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml @@ -45,6 +45,10 @@ yarn.sls.am.type.mapreduce org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator + + yarn.sls.am.type.stream + org.apache.hadoop.yarn.sls.appmaster.StreamAMSimulator + diff --git hadoop-tools/hadoop-sls/src/test/resources/syn.json hadoop-tools/hadoop-sls/src/test/resources/syn.json index 8479d23c318..c6e2c9236e5 100644 --- hadoop-tools/hadoop-sls/src/test/resources/syn.json +++ hadoop-tools/hadoop-sls/src/test/resources/syn.json @@ -45,7 +45,7 @@ }, { "time": 60, - "jobs": 0 + "weight": 0 } ] } diff --git hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json new file mode 100644 index 00000000000..bde4cd0a69a --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json @@ -0,0 +1,54 @@ +{ + "description": "tiny jobs workload", + "num_nodes": 20, + "nodes_per_rack": 4, + "num_jobs": 10, + "rand_seed": 2, + "workloads": [ + { + "workload_name": "tiny-test", + "workload_weight": 0.5, + "description": "Sort jobs", + "queue_name": "sls_queue_1", + "job_classes": [ + { + "class_name": "class_1", + "user_name": "foobar", + "class_weight": 1.0, + "type": "mapreduce", + "deadline_factor": {"val": 10}, + "duration": {"val": 60, "std": 5}, + "reservation": {"val": 0.5}, + "tasks":[ + { + "type": "map", + "priority": 20, + "count": { "val": 5, "std": 1}, + "time": {"val": 10, "std": 2}, + "max_memory": {"val": 1024}, + "max_vcores": {"val": 1} + }, + { + "type": "reduce", + "priority": 10, + "count": { "val": 5, "std": 1}, + "time": {"val": 20, "std": 4}, + "max_memory": {"val": 2048}, + "max_vcores": {"val": 2} + } + ] + } + ], + "time_distribution": [ + { + "time": 1, + "weight": 100 + }, + { + "time": 60, + "weight": 0 + } + ] + } + ] +} diff --git hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json new file mode 100644 index 00000000000..a85065b5c94 --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json @@ -0,0 +1,46 @@ +{ + "description": "stream workload", + "num_nodes": 20, + "nodes_per_rack": 4, + "num_jobs": 5, + "rand_seed": 2, + "workloads": [ + { + "workload_name": "tiny-test", + "workload_weight": 1, + "description": "long lived streaming jobs", + "queue_name": "sls_queue_1", + "job_classes": [ + { + "class_name": "class_1", + "user_name": "foobar", + "class_weight": 1.0, + "type": "stream", + "deadline_factor": {"val": 10}, + "duration": {"val": 30, "std": 5}, + "reservation": {"val": 0.5}, + "tasks":[ + { + "type": "stream", + "priority": 20, + "count": { "val": 2}, + "time": {"val": 60000}, + "max_memory": {"val": 4096}, + "max_vcores": {"val": 4} + } + ] + } + ], + "time_distribution": [ + { + "time": 1, + "weight": 100 + }, + { + "time": 2, + "weight": 0 + } + ] + } + ] +}