Index: src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java =================================================================== --- src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (revision 54) +++ src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (revision 133) @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.SuspendAllocating; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -750,6 +751,12 @@ app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); + String key = app.applicationId.toString(); + if (SuspendAllocating.appSuspendStatusMap.containsKey(key)) { + SuspendAllocating.appSuspendStatusMap.remove(key); + SuspendAllocating.appToAskMap.remove(key); + } + }; } Index: src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java =================================================================== --- src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (revision 54) +++ src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (revision 133) @@ -74,7 +74,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -83,10 +82,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; @SuppressWarnings("unchecked") @Private @@ -428,6 +427,28 @@ throw e; } + String key = appAttemptId.getApplicationId().toString(); + boolean beSuspended = SuspendAllocating.appSuspendStatusMap.containsKey(key) && SuspendAllocating.appSuspendStatusMap.get(key); + List lastask = SuspendAllocating.appToAskMap.get(key) != null? SuspendAllocating.appToAskMap.get(key):new ArrayList(); + if (beSuspended) { + if(lastask.size() <= 0) { + copyAskArrayList(ask, lastask); + SuspendAllocating.appToAskMap.put(key, lastask); + } + + for(int i = 0; i < ask.size(); i++){ + ask.get(i).setNumContainers(0); + } + } else { + if (lastask.size() > 0) { + ask.clear(); + copyAskArrayList(lastask, ask); + if (SuspendAllocating.appSuspendStatusMap.containsKey(key)) { + SuspendAllocating.appToAskMap.remove(key); + } + } + } + // Send new requests to appAttempt. Allocation allocation = this.rScheduler.allocate(appAttemptId, ask, release, @@ -507,6 +528,19 @@ } } + //copy ResourceRequest list + private void copyAskArrayList(List src, List dest) { + for (int i = 0; i < src.size(); i++) { + ResourceRequest requestbak = Records.newRecord(ResourceRequest.class); + requestbak.setPriority(src.get(i).getPriority()); + requestbak.setResourceName(src.get(i).getResourceName()); + requestbak.setCapability(src.get(i).getCapability()); + requestbak.setNumContainers(src.get(i).getNumContainers()); + requestbak.setRelaxLocality(src.get(i).getRelaxLocality()); + dest.add(requestbak); + } + } + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request Index: src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SuspendAllocating.java =================================================================== --- src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SuspendAllocating.java (revision 0) +++ src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SuspendAllocating.java (revision 133) @@ -0,0 +1,11 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.Hashtable; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; + +public class SuspendAllocating { + public static Hashtable> appToAskMap = new Hashtable>(); + public static Hashtable appSuspendStatusMap = new Hashtable(); +} Index: src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java =================================================================== --- src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (revision 54) +++ src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (revision 133) @@ -19,8 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; import java.io.IOException; +import java.util.Arrays; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import javax.servlet.http.HttpServletRequest; @@ -45,16 +46,15 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.StopAllocating; +import org.apache.hadoop.yarn.server.resourcemanager.SuspendAllocating; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -81,7 +81,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ContainerProcess; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -415,8 +414,90 @@ } return allApps; } + + //add get suspendedapps rest api + @GET + @Path("/suspendedapps") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public String getSuspendedApps(@PathParam("appid") String appId) { + StringBuffer buf = new StringBuffer(); + for (Entry entry : SuspendAllocating.appSuspendStatusMap.entrySet()) { + if (entry.getValue()) { + buf.append(entry.getKey()+","); + } + } + if (buf.toString().contains(",")){ + return buf.substring(0, buf.length() - 1); + } else { + return buf.toString(); + } + } + + //add suspend rest api @GET + @Path("/apps/{appid}/suspend") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public AppsInfo getAppSuspend(@Context HttpServletRequest hsr, + @PathParam("appid") String appId) { + init(); + String[] appList = appId.split(","); + AppsInfo appsInfo = new AppsInfo(); + for (int i = 0; i < appList.length; i++) { + if (appList[i] == null || appList[i].isEmpty()) { + throw new NotFoundException("appId, " + appList[i] + ", is empty or null"); + } + ApplicationId id; + id = ConverterUtils.toApplicationId(recordFactory, appList[i]); + if (id == null) { + throw new NotFoundException("appId is null"); + } + RMApp app = rm.getRMContext().getRMApps().get(id); + if (app == null) { + throw new NotFoundException("app with id: " + appList[i] + " not found"); + } + String key = appList[i]; + if ((SuspendAllocating.appSuspendStatusMap.get(key) == null) || !SuspendAllocating.appSuspendStatusMap.get(key)) { + SuspendAllocating.appSuspendStatusMap.put(key, true); + SuspendAllocating.appToAskMap.put(key, null); + } + appsInfo.add( new AppInfo(app, hasAccess(app, hsr))); + } + + return appsInfo; + } + + //add resume rest api + @GET + @Path("/apps/{appid}/resume") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public AppsInfo getAppResume(@Context HttpServletRequest hsr, + @PathParam("appid") String appId) { + init(); + String[] appList = appId.split(","); + AppsInfo appsInfo = new AppsInfo(); + for (int i = 0; i < appList.length; i++){ + if (appList[i] == null || appList[i].isEmpty()) { + throw new NotFoundException("appId, " + appList[i] + ", is empty or null"); + } + ApplicationId id; + id = ConverterUtils.toApplicationId(recordFactory, appList[i]); + if (id == null) { + throw new NotFoundException("appId is null"); + } + + RMApp app = rm.getRMContext().getRMApps().get(id); + if (app == null) { + throw new NotFoundException("app with id: " + appList[i] + " not found"); + } + SuspendAllocating.appSuspendStatusMap.put(id.toString(), false); + appsInfo.add( new AppInfo(app, hasAccess(app, hsr))); + } + + return appsInfo; + } + + @GET @Path("/appstatistics") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public ApplicationStatisticsInfo getAppStatistics( @@ -580,4 +661,6 @@ return appAttemptsInfo; } + + }