diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index a205bd1..ef3b33d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -256,6 +256,11 @@ public static boolean fitsIn(Resource smaller, Resource bigger) { smaller.getVirtualCores() <= bigger.getVirtualCores(); } + public static boolean fitsInMemory(Resource smaller, Resource bigger) + { + return smaller.getMemory() <= bigger.getMemory(); + } + public static Resource componentwiseMin(Resource lhs, Resource rhs) { return createResource(Math.min(lhs.getMemory(), rhs.getMemory()), Math.min(lhs.getVirtualCores(), rhs.getVirtualCores())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 624aa18..fbcaab9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -239,4 +240,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, RMAppMetrics getRMAppMetrics(); ReservationId getReservationId(); + + ResourceRequest getAMResourceRequest(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9b10872..24c542f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1331,6 +1331,11 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } protected Credentials parseCredentials() throws IOException { Credentials credentials = new Credentials(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ffeec63..9c52207 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -115,6 +115,8 @@ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private Resource usedAMResources = Resources.createResource(0, 0); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -608,6 +610,13 @@ public void submitApplication(ApplicationId applicationId, String userName, } private synchronized void activateApplications() { + //limit (as a resource) of allowed for application masters + //for this queue + Resource amLimit = + Resources.multiply( + lastClusterResource, + maxAMResourcePerQueuePercent / 100f); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { FiCaSchedulerApp application = i.next(); @@ -617,11 +626,29 @@ private synchronized void activateApplications() { break; } + // Check am resource limit + Resource amIfStarted = + Resources.add(application.getAMResource(), usedAMResources); + + LOG.info("!!!!AMRP RESOURCE " + application.getAMResource()); + LOG.info("!!!!AMRP RESOURCE LIMIT % " + + maxAMResourcePerQueuePercent); + LOG.info("!!!!AMRP RESOURCE LIMIT " + amLimit); + LOG.info("!!!!AMRP RESOURCE LIMIT LCR " + lastClusterResource); + LOG.info("!!!!AMRP IF STARTED " + amIfStarted); + + if (!Resources.fitsInMemory(amIfStarted, amLimit)) { + LOG.info("!!!!AMRP NO START DUE TO LIMIT"); + continue; + } + LOG.info("!!!!AMRP PAST LIMIT"); + // Check user limit User user = getUser(application.getUser()); if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { user.activateApplication(); activeApplications.add(application); + Resources.addTo(usedAMResources, application.getAMResource()); i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + @@ -672,6 +699,8 @@ public synchronized void removeApplicationAttempt( boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); + } else { + Resources.subtractFrom(usedAMResources, application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2f9569c..80dde92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -72,6 +73,8 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + setAMResource(rmContext.getRMApps().get(getApplicationId()).getAMResourceRequest().getCapability()); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 800f65b..1456f04 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,6 +52,7 @@ public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { + ResourceRequest amReq; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -183,6 +185,11 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 787b5d7..ec990f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; + ResourceRequest amReq; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -264,4 +266,9 @@ public RMAppMetrics getRMAppMetrics() { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } }