diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 029fa87..fdddcf4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -159,6 +160,10 @@ public String getNodeManagerVersion() { return null; } + @Override + public Set getNodeLabels() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 7eca66f..3b185ae 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; +import java.util.Set; @Private @Unstable @@ -147,4 +148,8 @@ public String getNodeManagerVersion() { return node.getNodeManagerVersion(); } + @Override + public Set getNodeLabels() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 707cf1b..35baa44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NMToken; @@ -254,13 +255,13 @@ public RegisterApplicationMasterResponse registerApplicationMaster( if (hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is already registered : " - + applicationAttemptId.getApplicationId(); + + appID; LOG.warn(message); RMAuditLogger.logFailure( this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), + .get(appID).getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - applicationAttemptId.getApplicationId(), applicationAttemptId); + appID, applicationAttemptId); throw new InvalidApplicationMasterRequestException(message); } @@ -340,6 +341,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( ApplicationAttemptId applicationAttemptId = authorizeRequest().getApplicationAttemptId(); + ApplicationId appId = applicationAttemptId.getApplicationId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { @@ -351,13 +353,13 @@ public FinishApplicationMasterResponse finishApplicationMaster( if (!hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is trying to unregister before registering for: " - + applicationAttemptId.getApplicationId(); + + appId; LOG.error(message); RMAuditLogger.logFailure( this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), + .get(appId).getUser(), AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", - message, applicationAttemptId.getApplicationId(), + message, appId, applicationAttemptId); throw new ApplicationMasterNotRegisteredException(message); } @@ -365,7 +367,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = - rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + rmContext.getRMApps().get(appId); if (rmApp.isAppFinalStateStored()) { return FinishApplicationMasterResponse.newInstance(true); @@ -418,6 +420,7 @@ public AllocateResponse allocate(AllocateRequest request) ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); + ApplicationId applicationId = appAttemptId.getApplicationId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -432,14 +435,14 @@ public AllocateResponse allocate(AllocateRequest request) if (!hasApplicationMasterRegistered(appAttemptId)) { String message = "Application Master is not registered for known application: " - + appAttemptId.getApplicationId() + + applicationId + ". Let AM resync."; LOG.info(message); RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + this.rmContext.getRMApps().get(applicationId) .getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - appAttemptId.getApplicationId(), + applicationId, appAttemptId); return resync; } @@ -481,11 +484,22 @@ public AllocateResponse allocate(AllocateRequest request) List blacklistRemovals = (blacklistRequest != null) ? blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - + RMApp app = + this.rmContext.getRMApps().get(applicationId); + + // set label expression for Resource Requests + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression()) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + // sanity check try { RMServerUtils.validateResourceRequests(ask, - rScheduler.getMaximumResourceCapability()); + rScheduler.getMaximumResourceCapability(), app.getQueue(), + rScheduler); } catch (InvalidResourceRequestException e) { LOG.warn("Invalid resource ask by application " + appAttemptId, e); throw e; @@ -498,8 +512,6 @@ public AllocateResponse allocate(AllocateRequest request) throw e; } - RMApp app = - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); // In the case of work-preserving AM restart, it's possible for the // AM to release containers from the earlier attempt. if (!app.getApplicationSubmissionContext() @@ -582,7 +594,7 @@ public AllocateResponse allocate(AllocateRequest request) .toString(), amrmToken.getPassword(), amrmToken.getService() .toString())); LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + appAttemptId.getApplicationId()); + + " to application: " + applicationId); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 1d672e5..c4ce1c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -343,7 +343,7 @@ private RMAppImpl createAndPopulateNewRMApp( long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); - validateResourceRequest(submissionContext); + ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext); // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, @@ -351,7 +351,7 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags()); + submissionContext.getApplicationTags(), amReq); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not @@ -373,7 +373,8 @@ private RMAppImpl createAndPopulateNewRMApp( return application; } - private void validateResourceRequest( + @SuppressWarnings("deprecation") + private ResourceRequest validateAndCreateResourceRequest( ApplicationSubmissionContext submissionContext) throws InvalidResourceRequestException { // Validation of the ApplicationSubmissionContext needs to be completed @@ -383,18 +384,40 @@ private void validateResourceRequest( // Check whether AM resource requirements are within required limits if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); + ResourceRequest amReq; + if (submissionContext.getAMContainerResourceRequest() != null) { + amReq = submissionContext.getAMContainerResourceRequest(); + } else { + amReq = + BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1); + } + + // set label expression for AM container + if (null == amReq.getNodeLabelExpression()) { + amReq.setNodeLabelExpression(submissionContext + .getNodeLabelExpression()); + } + if (null == amReq.getPriority()) { + amReq.setPriority(RMAppAttemptImpl.AM_CONTAINER_PRIORITY); + } + amReq.setNumContainers(1); + try { SchedulerUtils.validateResourceRequest(amReq, - scheduler.getMaximumResourceCapability()); + scheduler.getMaximumResourceCapability(), + submissionContext.getQueue(), scheduler); } catch (InvalidResourceRequestException e) { LOG.warn("RM app submission failed in validating AM resource request" + " for application " + submissionContext.getApplicationId(), e); throw e; } + + return amReq; } + + return null; } private boolean isApplicationInFinalState(RMAppState rmAppState) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index a59965f..c34618d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -108,6 +109,10 @@ void setRMApplicationHistoryWriter( boolean isWorkPreservingRecoveryEnabled(); + DynamicNodeLabelsManager getNodeLabelManager(); + + public void setNodeLabelManager(DynamicNodeLabelsManager mgr); + long getEpoch(); ReservationSystem getReservationSystem(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 78787ee..1c732be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -91,6 +92,7 @@ private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; private ConfigurationProvider configurationProvider; + private DynamicNodeLabelsManager nodeLabelManager; private long epoch; private Clock systemClock = new SystemClock(); private long schedulerRecoveryStartTime = 0; @@ -406,6 +408,16 @@ void setEpoch(long epoch) { this.epoch = epoch; } + @Override + public DynamicNodeLabelsManager getNodeLabelManager() { + return nodeLabelManager; + } + + @Override + public void setNodeLabelManager(DynamicNodeLabelsManager mgr) { + nodeLabelManager = mgr; + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { this.schedulerRecoveryStartTime = systemClock.getTime(); this.schedulerRecoveryWaitTime = waitTime; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 29c5953..46cad1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -84,9 +85,11 @@ * requested memory/vcore is non-negative and not greater than max */ public static void validateResourceRequests(List ask, - Resource maximumResource) throws InvalidResourceRequestException { + Resource maximumResource, String queueName, YarnScheduler scheduler) + throws InvalidResourceRequestException { for (ResourceRequest resReq : ask) { - SchedulerUtils.validateResourceRequest(resReq, maximumResource); + SchedulerUtils.validateResourceRequest(resReq, maximumResource, + queueName, scheduler); } } @@ -137,12 +140,13 @@ public static void validateBlacklistRequest( * passed {@link AccessControlList} * @param acl the {@link AccessControlList} to check against * @param method the method name to be logged + * @param module, like AdminService or NodeLabelManager * @param LOG the logger to use * @return {@link UserGroupInformation} of the current user * @throws IOException */ public static UserGroupInformation verifyAccess( - AccessControlList acl, String method, final Log LOG) + AccessControlList acl, String method, String module, final Log LOG) throws IOException { UserGroupInformation user; try { @@ -159,7 +163,7 @@ public static UserGroupInformation verifyAccess( " to call '" + method + "'"); RMAuditLogger.logFailure(user.getShortUserName(), method, - acl.toString(), "AdminService", + acl.toString(), module, RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); throw new AccessControlException("User " + user.getShortUserName() + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3e5f138..68f123a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -320,6 +321,10 @@ protected AMLivelinessMonitor createAMLivelinessMonitor() { return new AMLivelinessMonitor(this.rmDispatcher); } + protected DynamicNodeLabelsManager createNodeLabelManager() { + return new DynamicNodeLabelsManager(); + } + protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); } @@ -399,6 +404,10 @@ protected void serviceInit(Configuration configuration) throws Exception { AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); + + DynamicNodeLabelsManager nlm = createNodeLabelManager(); + addService(nlm); + rmContext.setNodeLabelManager(nlm); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, @@ -960,7 +969,7 @@ protected void startWepApp() { * instance of {@link RMActiveServices} and initializes it. * @throws Exception */ - void createAndInitActiveServices() throws Exception { + protected void createAndInitActiveServices() throws Exception { activeServices = new RMActiveServices(); activeServices.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 0c0fbc0..126560a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -126,14 +127,18 @@ public synchronized void synchronizePlan(Plan plan) { // create the default reservation queue if it doesnt exist String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; if (scheduler.getQueue(defReservationQueue) == null) { - ReservationQueue defQueue = - new ReservationQueue(scheduler, defReservationQueue, planQueue); try { + ReservationQueue defQueue = + new ReservationQueue(scheduler, defReservationQueue, planQueue); scheduler.addQueue(defQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( "Exception while trying to create default reservation queue for plan: {}", planQueueName, e); + } catch (IOException e) { + LOG.warn( + "Exception while trying to create default reservation queue for plan: {}", + planQueueName, e); } } curReservationNames.add(defReservationQueue); @@ -186,14 +191,18 @@ public synchronized void synchronizePlan(Plan plan) { for (ReservationAllocation res : sortedAllocations) { String currResId = res.getReservationId().toString(); if (curReservationNames.contains(currResId)) { - ReservationQueue resQueue = - new ReservationQueue(scheduler, currResId, planQueue); try { + ReservationQueue resQueue = + new ReservationQueue(scheduler, currResId, planQueue); scheduler.addQueue(resQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( "Exception while trying to activate reservation: {} for plan: {}", currResId, planQueueName, e); + } catch (IOException e) { + LOG.warn( + "Exception while trying to activate reservation: {} for plan: {}", + currResId, planQueueName, e); } } Resource capToAssign = res.getResourcesAtTime(now); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index c0681aa..1994b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -143,6 +144,7 @@ private RMAppEvent eventCausingFinalSaving; private RMAppState targetedFinalState; private RMAppState recoveredFinalState; + private ResourceRequest amReq; Object transitionTodo; @@ -342,7 +344,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, - String applicationType, Set applicationTags) { + String applicationType, Set applicationTags, + ResourceRequest amReq) { this.systemClock = new SystemClock(); @@ -361,6 +364,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.startTime = this.systemClock.getTime(); this.applicationType = applicationType; this.applicationTags = applicationTags; + this.amReq = amReq; int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -732,7 +736,7 @@ private void createNewAttempt() { // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1)); + maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index fbcb7d7..bc30e01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -35,7 +35,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; +import javax.jws.Oneway; +import org.apache.avro.reflect.Union; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -93,7 +95,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -177,6 +178,7 @@ private Object transitionTodo; private RMAppAttemptMetrics attemptMetrics = null; + private ResourceRequest amReq = null; private static final StateMachineFactory EMPTY_CONTAINER_REQUEST_LIST = new ArrayList(); - private static final class ScheduleTransition + @VisibleForTesting + public static final class ScheduleTransition implements MultipleArcTransition { @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - if (!appAttempt.submissionContext.getUnmanagedAM()) { - // Request a container for the AM. - ResourceRequest request = - BuilderUtils.newResourceRequest( - AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt - .getSubmissionContext().getResource(), 1); - + ApplicationSubmissionContext subCtx = appAttempt.submissionContext; + if (!subCtx.getUnmanagedAM()) { + // Need reset #containers before create new attempt, because this request + // will be passed to scheduler, and scheduler will deduct the number after + // AM container allocated + + // Currently, following fields are all hard code, + // TODO: change these fields when we want to support + // priority/resource-name/relax-locality specification for AM containers + // allocation. + appAttempt.amReq.setNumContainers(1); + appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); + appAttempt.amReq.setResourceName(ResourceRequest.ANY); + appAttempt.amReq.setRelaxLocality(true); + // SchedulerUtils.validateResourceRequests is not necessary because // AM resource has been checked when submission - Allocation amContainerAllocation = appAttempt.scheduler.allocate( - appAttempt.applicationAttemptId, - Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null); + Allocation amContainerAllocation = + appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, + Collections.singletonList(appAttempt.amReq), + EMPTY_CONTAINER_RELEASE_LIST, null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); @@ -919,6 +934,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, private static final class AMContainerAllocatedTransition implements MultipleArcTransition { + @SuppressWarnings("deprecation") @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index a423ea5..afbcbc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -20,6 +20,7 @@ import java.util.List; +import java.util.Set; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -135,4 +136,11 @@ * @return containerUpdates accumulated across NM heartbeats. */ public List pullContainerUpdates(); + + /** + * Get set of labels in this node + * + * @return labels in this node + */ + public Set getNodeLabels(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index c960b50..13d60ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -855,4 +855,12 @@ public int getQueueSize() { public Set getLaunchedContainers() { return this.launchedContainers; } + + @Override + public Set getNodeLabels() { + if (context.getNodeLabelManager() == null) { + return null; + } + return context.getNodeLabelManager().getLabelsOnNode(nodeId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8e8d627..c6a322d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -442,6 +442,14 @@ protected void releaseContainers(List containers, public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } + + public List getSchedulerNodes() { + List snodes = new ArrayList(); + for (N node : nodes.values()) { + snodes.add(node); + } + return snodes; + } @Override public synchronized void moveAllApps(String sourceQueue, String destQueue) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 0bc8ca1..4663a91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -71,4 +72,22 @@ */ public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); + + /** + * Get labels can be accessed of this queue + * labels={*}, means this queue can access any label + * labels={ }, means this queue cannot access any label except node without label + * labels={a, b, c} means this queue can access a or b or c + * @return labels + */ + public Set getAccessibleNodeLabels(); + + /** + * Get default label expression of this queue. If label expression of + * ApplicationSubmissionContext and label expression of Resource Request not + * set, this will be used. + * + * @return default label expression + */ + public String getDefaultNodeLabelExpression(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index ac37c2f..1dde29a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -17,23 +17,29 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.List; +import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.Sets; + /** * Utilities shared by schedulers. */ @@ -190,7 +196,8 @@ public static void normalizeRequest( * request */ public static void validateResourceRequest(ResourceRequest resReq, - Resource maximumResource) throws InvalidResourceRequestException { + Resource maximumResource, String queueName, YarnScheduler scheduler) + throws InvalidResourceRequestException { if (resReq.getCapability().getMemory() < 0 || resReq.getCapability().getMemory() > maximumResource.getMemory()) { throw new InvalidResourceRequestException("Invalid resource request" @@ -209,5 +216,115 @@ public static void validateResourceRequest(ResourceRequest resReq, + resReq.getCapability().getVirtualCores() + ", maxVirtualCores=" + maximumResource.getVirtualCores()); } + + // Get queue from scheduler + QueueInfo queueInfo = null; + try { + queueInfo = scheduler.getQueueInfo(queueName, false, false); + } catch (IOException e) { + // it is possible queue cannot get when queue mapping is set, just ignore + // the queueInfo here, and move forward + } + + // check labels in the resource request. + String labelExp = resReq.getNodeLabelExpression(); + + // if queue has default label expression, and RR doesn't have, use the + // default label expression of queue + if (labelExp == null && queueInfo != null) { + labelExp = queueInfo.getDefaultNodeLabelExpression(); + resReq.setNodeLabelExpression(labelExp); + } + + if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { + if (!checkQueueLabelExpression(queueInfo.getNodeLabels(), + labelExp)) { + throw new InvalidResourceRequestException("Invalid resource request" + + ", queue=" + + queueInfo.getQueueName() + + " doesn't have permission to access all labels " + + "in resource request. labelExpression of resource request=" + + labelExp + + ". Queue labels=" + + (queueInfo.getNodeLabels() == null ? "" : StringUtils.join(queueInfo + .getNodeLabels().iterator(), ','))); + } + } + } + + public static boolean checkQueueAccessToNode(Set queueLabels, + Set nodeLabels) { + // if queue's label is *, it can access any node + if (queueLabels != null && queueLabels.contains(DynamicNodeLabelsManager.ANY)) { + return true; + } + // any queue can access to a node without label + if (nodeLabels == null || nodeLabels.isEmpty()) { + return true; + } + // a queue can access to a node only if it contains any label of the node + if (queueLabels != null + && Sets.intersection(queueLabels, nodeLabels).size() > 0) { + return true; + } + // sorry, you cannot access + return false; + } + + public static void checkAndThrowIfLabelNotIncluded(DynamicNodeLabelsManager mgr, + Set labels) throws IOException { + if (mgr == null) { + if (labels != null && !labels.isEmpty()) { + throw new IOException("NodeLabelManager is null, please check"); + } + return; + } + + if (labels != null) { + for (String label : labels) { + if (!mgr.containsNodeLabel(label)) { + throw new IOException("NodeLabelManager doesn't include label = " + + label + ", please check."); + } + } + } + } + + public static boolean checkNodeLabelExpression(Set nodeLabels, + String labelExpression) { + // empty label expression can only allocate on node with empty labels + if (labelExpression == null || labelExpression.trim().isEmpty()) { + if (!nodeLabels.isEmpty()) { + return false; + } + } + + if (labelExpression != null) { + for (String str : labelExpression.split("&&")) { + if (!str.trim().isEmpty() + && (nodeLabels == null || !nodeLabels.contains(str.trim()))) { + return false; + } + } + } + return true; + } + + public static boolean checkQueueLabelExpression(Set queueLabels, + String labelExpression) { + if (queueLabels != null && queueLabels.contains(DynamicNodeLabelsManager.ANY)) { + return true; + } + // if label expression is empty, we can allocate container on any node + if (labelExpression == null) { + return true; + } + for (String str : labelExpression.split("&&")) { + if (!str.trim().isEmpty() + && (queueLabels == null || !queueLabels.contains(str.trim()))) { + return false; + } + } + return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index d4e043d..e1050da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -270,4 +271,16 @@ public String toString() { return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); } + + @Override + public Set getAccessibleNodeLabels() { + // TODO, add implementation for FS + return null; + } + + @Override + public String getDefaultNodeLabelExpression() { + // TODO, add implementation for FS + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index ea21c2b..532edc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; @@ -187,6 +188,18 @@ public void recoverContainer(Resource clusterResource, updateAppHeadRoom(schedulerAttempt); updateAvailableResourcesMetrics(); } + + @Override + public Set getAccessibleNodeLabels() { + // TODO add implementation for FIFO scheduler + return null; + } + + @Override + public String getDefaultNodeLabelExpression() { + // TODO add implementation for FIFO scheduler + return null; + } }; public FifoScheduler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index ce5dd96..76ede39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -147,6 +147,7 @@ public Resource getUsedResources() { return used; } + @SuppressWarnings("deprecation") public synchronized void submit() throws IOException, YarnException { ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); context.setApplicationId(this.applicationId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 91e1905..7af9966 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -135,34 +135,52 @@ public AllocateResponse schedule() throws Exception { public void addContainerToBeReleased(ContainerId containerId) { releases.add(containerId); } + public AllocateResponse allocate( String host, int memory, int numContainers, List releases) throws Exception { - List reqs = createReq(new String[]{host}, memory, 1, numContainers); + return allocate(host, memory, numContainers, releases, null); + } + + public AllocateResponse allocate( + String host, int memory, int numContainers, + List releases, String labelExpression) throws Exception { + List reqs = + createReq(new String[] { host }, memory, 1, numContainers, + labelExpression); return allocate(reqs, releases); } - + public List createReq(String[] hosts, int memory, int priority, int containers) throws Exception { + return createReq(hosts, memory, priority, containers, null); + } + + public List createReq(String[] hosts, int memory, int priority, + int containers, String labelExpression) throws Exception { List reqs = new ArrayList(); for (String host : hosts) { ResourceRequest hostReq = createResourceReq(host, memory, priority, - containers); + containers, labelExpression); reqs.add(hostReq); ResourceRequest rackReq = createResourceReq("/default-rack", memory, - priority, containers); + priority, containers, labelExpression); reqs.add(rackReq); } ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory, - priority, containers); + priority, containers, labelExpression); reqs.add(offRackReq); return reqs; - } - + public ResourceRequest createResourceReq(String resource, int memory, int priority, int containers) throws Exception { + return createResourceReq(resource, memory, priority, containers, null); + } + + public ResourceRequest createResourceReq(String resource, int memory, int priority, + int containers, String labelExpression) throws Exception { ResourceRequest req = Records.newRecord(ResourceRequest.class); req.setResourceName(resource); req.setNumContainers(containers); @@ -172,6 +190,9 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori Resource capability = Records.newRecord(Resource.class); capability.setMemory(memory); req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } return req; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 79f9098..228f200 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -202,7 +203,11 @@ public String getHealthReport() { public long getLastHealthReportTime() { return lastHealthReportTime; } - + + @Override + public Set getNodeLabels() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 4f5fdeb..10a8d87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -183,27 +183,43 @@ public MockAM waitForNewAMToLaunchAndRegister(ApplicationId appId, int attemptSi return launchAndRegisterAM(app, this, nm); } - public void waitForState(MockNM nm, ContainerId containerId, + public boolean waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { + // default is wait for 30,000 ms + return waitForState(nm, containerId, containerState, 30 * 1000); + } + + public boolean waitForState(MockNM nm, ContainerId containerId, + RMContainerState containerState, int timeoutMillisecs) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeoutSecs = 0; - while(container == null && timeoutSecs++ < 100) { + while(container == null && timeoutSecs++ < timeoutMillisecs / 100) { nm.nodeHeartbeat(true); container = getResourceScheduler().getRMContainer(containerId); System.out.println("Waiting for container " + containerId + " to be allocated."); Thread.sleep(100); + + if (timeoutMillisecs <= timeoutSecs * 100) { + return false; + } } Assert.assertNotNull("Container shouldn't be null", container); - timeoutSecs = 0; - while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) { + while (!containerState.equals(container.getState()) + && timeoutSecs++ < timeoutMillisecs / 100) { System.out.println("Container : " + containerId + " State is : " + container.getState() + " Waiting for state : " + containerState); nm.nodeHeartbeat(true); - Thread.sleep(300); + Thread.sleep(100); + + if (timeoutMillisecs <= timeoutSecs * 100) { + return false; + } } + System.out.println("Container State is : " + container.getState()); Assert.assertEquals("Container state is not correct (timedout)", containerState, container.getState()); + return true; } // get new application id @@ -301,6 +317,7 @@ public RMApp submitApp(int masterMemory, String name, String user, isAppIdProvided, applicationId, 0, null); } + @SuppressWarnings("deprecation") public RMApp submitApp(int masterMemory, LogAggregationContext logAggregationContext) throws Exception { return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() @@ -310,6 +327,7 @@ public RMApp submitApp(int masterMemory, false, null, 0, logAggregationContext); } + @SuppressWarnings("deprecation") public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 58258ac..9f54de8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -34,9 +35,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -150,7 +153,7 @@ protected void submitApplication( this.rmContext.getScheduler(), this.rmContext.getApplicationMasterService(), submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags()); + submissionContext.getApplicationTags(), null); this.rmContext.getRMApps().put(submissionContext.getApplicationId(), application); //Do not send RMAppEventType.START event diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 333d0cf..e146611 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; + import static org.mockito.Matchers.isA; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -37,7 +38,6 @@ import java.util.concurrent.ConcurrentMap; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.MockApps; @@ -207,6 +207,7 @@ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmConte private ApplicationSubmissionContext asContext; private ApplicationId appId; + @SuppressWarnings("deprecation") @Before public void setUp() { long now = System.currentTimeMillis(); @@ -540,6 +541,7 @@ public void testRMAppSubmitDuplicateApplicationId() throws Exception { Assert.assertEquals("app state doesn't match", RMAppState.FINISHED, app.getState()); } + @SuppressWarnings("deprecation") @Test (timeout = 30000) public void testRMAppSubmitInvalidResourceRequest() throws Exception { asContext.setResource(Resources.createResource( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java index a288c57..5b20149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; + import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; @@ -30,7 +31,6 @@ import java.util.Map; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -179,6 +179,7 @@ public void testApplicationACLs() throws Exception { verifyAdministerQueueUserAccess(); } + @SuppressWarnings("deprecation") private ApplicationId submitAppAndGetAppId(AccessControlList viewACL, AccessControlList modifyACL) throws Exception { SubmitApplicationRequest submitRequest = recordFactory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 954e21d..df70ec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -44,13 +44,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -87,7 +86,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -100,10 +98,11 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -138,10 +137,10 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -333,7 +332,7 @@ public void handle(Event event) { mock(ApplicationSubmissionContext.class); YarnConfiguration config = new YarnConfiguration(); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config, false); + rmContext, yarnScheduler, null, asContext, config, false, null); ApplicationResourceUsageReport report = rmAppAttemptImpl .getApplicationResourceUsageReport(); assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); @@ -1061,6 +1060,7 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, return mockSubmitAppRequest(appId, name, queue, tags, false); } + @SuppressWarnings("deprecation") private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, String name, String queue, Set tags, boolean unmanaged) { @@ -1150,26 +1150,32 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, final long memorySeconds, final long vcoreSeconds) { ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); when(asContext.getMaxAppAttempts()).thenReturn(1); - RMAppImpl app = spy(new RMAppImpl(applicationId3, rmContext, config, null, - null, queueName, asContext, yarnScheduler, null, - System.currentTimeMillis(), "YARN", null) { - @Override - public ApplicationReport createAndGetApplicationReport( - String clientUserName, boolean allowAccess) { - ApplicationReport report = super.createAndGetApplicationReport( - clientUserName, allowAccess); - ApplicationResourceUsageReport usageReport = - report.getApplicationResourceUsageReport(); - usageReport.setMemorySeconds(memorySeconds); - usageReport.setVcoreSeconds(vcoreSeconds); - report.setApplicationResourceUsageReport(usageReport); - return report; - } - }); + + RMAppImpl app = + spy(new RMAppImpl(applicationId3, rmContext, config, null, null, + queueName, asContext, yarnScheduler, null, + System.currentTimeMillis(), "YARN", null, + BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + Resource.newInstance(1024, 1), 1)){ + @Override + public ApplicationReport createAndGetApplicationReport( + String clientUserName, boolean allowAccess) { + ApplicationReport report = super.createAndGetApplicationReport( + clientUserName, allowAccess); + ApplicationResourceUsageReport usageReport = + report.getApplicationResourceUsageReport(); + usageReport.setMemorySeconds(memorySeconds); + usageReport.setVcoreSeconds(vcoreSeconds); + report.setApplicationResourceUsageReport(usageReport); + return report; + } + }); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(123456, 1), 1); RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config, false)); + rmContext, yarnScheduler, null, asContext, config, false, null)); Container container = Container.newInstance( ContainerId.newInstance(attemptId, 1), null, "", null, null, null); RMContainerImpl containerimpl = spy(new RMContainerImpl(container, @@ -1230,7 +1236,7 @@ public void testReservationAPIs() { rm.start(); MockNM nm; try { - nm = rm.registerNode("127.0.0.1:0", 102400, 100); + nm = rm.registerNode("127.0.0.1:1", 102400, 100); // allow plan follower to synchronize Thread.sleep(1050); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 71b5b8b..15f38b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -17,13 +17,16 @@ *******************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -44,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.junit.Assert; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class ReservationSystemTestUtil { @@ -55,6 +61,7 @@ public static ReservationId getNewReservationId() { return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { // stolen from TestCapacityScheduler @@ -68,6 +75,29 @@ public CapacityScheduler mockCapacityScheduler(int numContainers) new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null)); + + DynamicNodeLabelsManager nlm = mock(DynamicNodeLabelsManager.class); + when( + nlm.getQueueResource(any(String.class), any(Set.class), + any(Resource.class))).thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Resource) args[2]; + } + }); + + when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) + .thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Resource) args[1]; + } + }); + + mockRmContext.setNodeLabelManager(nlm); + cs.setRMContext(mockRmContext); try { cs.serviceInit(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 457f21e..6a66385 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -254,7 +257,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null); + System.currentTimeMillis(), "YARN", null, null); testAppStartState(applicationId, user, name, queue, application); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), @@ -914,6 +917,7 @@ public void testAppsRecoveringStates() throws Exception { } } + @SuppressWarnings("deprecation") public void testRecoverApplication(ApplicationState appState, RMState rmState) throws Exception { ApplicationSubmissionContext submissionContext = @@ -923,7 +927,10 @@ public void testRecoverApplication(ApplicationState appState, RMState rmState) submissionContext.getApplicationName(), null, submissionContext.getQueue(), submissionContext, null, null, appState.getSubmitTime(), submissionContext.getApplicationType(), - submissionContext.getApplicationTags()); + submissionContext.getApplicationTags(), + BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1)); Assert.assertEquals(RMAppState.NEW, application.getState()); application.recover(rmState); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 7f27f4e..e5daf6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -40,7 +40,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -62,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -83,8 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -96,7 +100,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; - import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -112,6 +115,7 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.junit.After; import org.junit.Assert; @@ -122,6 +126,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @RunWith(value = Parameterized.class) public class TestRMAppAttemptTransitions { @@ -229,6 +235,7 @@ public TestRMAppAttemptTransitions(Boolean isSecurityEnabled) { this.isSecurityEnabled = isSecurityEnabled; } + @SuppressWarnings("deprecation") @Before public void setUp() throws Exception { AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE; @@ -300,6 +307,7 @@ public void setUp() throws Exception { Mockito.doReturn(resourceScheduler).when(spyRMContext).getScheduler(); + final String user = MockApps.newUserName(); final String queue = MockApps.newQueue(); submissionContext = mock(ApplicationSubmissionContext.class); when(submissionContext.getQueue()).thenReturn(queue); @@ -315,7 +323,11 @@ public void setUp() throws Exception { application = mock(RMAppImpl.class); applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler, - masterService, submissionContext, new Configuration(), false); + masterService, submissionContext, new Configuration(), false, + BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1)); + when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); when(application.getApplicationId()).thenReturn(applicationId); spyRMContext.getRMApps().put(application.getApplicationId(), application); @@ -1399,13 +1411,16 @@ public void testFailedToFailed() { } + @SuppressWarnings("deprecation") @Test public void testContainersCleanupForLastAttempt() { // create a failed attempt. applicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler, masterService, submissionContext, new Configuration(), - true); + true, BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1)); when(submissionContext.getKeepContainersAcrossApplicationAttempts()) .thenReturn(true); when(submissionContext.getMaxAppAttempts()).thenReturn(1); @@ -1427,6 +1442,49 @@ public void testContainersCleanupForLastAttempt() { assertFalse(transferStateFromPreviousAttempt); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } + + @SuppressWarnings("unchecked") + @Test + public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() { + YarnScheduler mockScheduler = mock(YarnScheduler.class); + when( + mockScheduler.allocate(any(ApplicationAttemptId.class), + any(List.class), any(List.class), any(List.class), any(List.class))) + .thenAnswer(new Answer() { + + @SuppressWarnings("rawtypes") + @Override + public Allocation answer(InvocationOnMock invocation) + throws Throwable { + ResourceRequest rr = + (ResourceRequest) ((List) invocation.getArguments()[1]).get(0); + + // capacity shouldn't changed + assertEquals(Resource.newInstance(3333, 1), rr.getCapability()); + assertEquals("label-expression", rr.getNodeLabelExpression()); + + // priority, #container, relax-locality will be changed + assertEquals(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, rr.getPriority()); + assertEquals(1, rr.getNumContainers()); + assertEquals(ResourceRequest.ANY, rr.getResourceName()); + + // just return an empty allocation + List l = new ArrayList(); + Set s = new HashSet(); + return new Allocation(l, Resources.none(), s, s, l); + } + }); + + // create an attempt. + applicationAttempt = + new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), + spyRMContext, scheduler, masterService, submissionContext, + new Configuration(), true, ResourceRequest.newInstance( + Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3, + false, "label-expression")); + new RMAppAttemptImpl.ScheduleTransition().transition( + (RMAppAttemptImpl) applicationAttempt, null); + } private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics, int exitCode, boolean shouldCheckURL) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 460f35e..77c5193 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -21,13 +21,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -58,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -74,6 +82,8 @@ import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestSchedulerUtils { private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class); @@ -173,69 +183,240 @@ public void testNormalizeRequestWithDominantResourceCalculator() { assertEquals(1, ask.getCapability().getVirtualCores()); assertEquals(2048, ask.getCapability().getMemory()); } - + @Test (timeout = 30000) - public void testValidateResourceRequest() { + public void testValidateResourceRequestWithErrorLabelsPermission() + throws IOException { + // mock queue and scheduler + YarnScheduler scheduler = mock(YarnScheduler.class); + Set queueAccessibleNodeLabels = Sets.newHashSet(); + QueueInfo queueInfo = mock(QueueInfo.class); + when(queueInfo.getQueueName()).thenReturn("queue"); + when(queueInfo.getNodeLabels()).thenReturn(queueAccessibleNodeLabels); + when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean())) + .thenReturn(queueInfo); + Resource maxResource = Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - // zero memory + // queue has labels, success cases try { + // set queue accessible node labesl to [x, y] + queueAccessibleNodeLabels.clear(); + queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); Resource resource = Resources.createResource( 0, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + resReq.setNodeLabelExpression("x"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression("x && y"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression("y"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression(""); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression(" "); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); } catch (InvalidResourceRequestException e) { - fail("Zero memory should be accepted"); + e.printStackTrace(); + fail("Should be valid when request labels is a subset of queue labels"); } - - // zero vcores + + // queue has labels, failed cases (when ask a label not included by queue) try { + // set queue accessible node labesl to [x, y] + queueAccessibleNodeLabels.clear(); + queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - 0); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + resReq.setNodeLabelExpression("z"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + fail("Should fail"); } catch (InvalidResourceRequestException e) { - fail("Zero vcores should be accepted"); } - - // max memory + try { + // set queue accessible node labesl to [x, y] + queueAccessibleNodeLabels.clear(); + queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 0, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + resReq.setNodeLabelExpression("x && y && z"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + fail("Should fail"); } catch (InvalidResourceRequestException e) { - fail("Max memory should be accepted"); } - - // max vcores + + // queue doesn't have label, succeed (when request no label) + queueAccessibleNodeLabels.clear(); try { + // set queue accessible node labels to empty + queueAccessibleNodeLabels.clear(); + Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression(""); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression(" "); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); } catch (InvalidResourceRequestException e) { - fail("Max vcores should not be accepted"); + e.printStackTrace(); + fail("Should be valid when request labels is empty"); } - - // negative memory + + // queue doesn't have label, failed (when request any label) + try { + // set queue accessible node labels to empty + queueAccessibleNodeLabels.clear(); + + Resource resource = Resources.createResource( + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, resource, 1); + resReq.setNodeLabelExpression("x"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + fail("Should fail"); + } catch (InvalidResourceRequestException e) { + } + + // queue is "*", always succeeded try { + // set queue accessible node labels to empty + queueAccessibleNodeLabels.clear(); + queueAccessibleNodeLabels.add(DynamicNodeLabelsManager.ANY); + Resource resource = Resources.createResource( - -1, + 0, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + resReq.setNodeLabelExpression("x"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression("x && y && z"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + + resReq.setNodeLabelExpression("z"); + SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue", + scheduler); + } catch (InvalidResourceRequestException e) { + e.printStackTrace(); + fail("Should be valid when request labels is empty"); + } + } + + @Test (timeout = 30000) + public void testValidateResourceRequest() { + YarnScheduler mockScheduler = mock(YarnScheduler.class); + + Resource maxResource = + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + // zero memory + try { + Resource resource = + Resources.createResource(0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); + } catch (InvalidResourceRequestException e) { + fail("Zero memory should be accepted"); + } + + // zero vcores + try { + Resource resource = + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); + } catch (InvalidResourceRequestException e) { + fail("Zero vcores should be accepted"); + } + + // max memory + try { + Resource resource = + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); + } catch (InvalidResourceRequestException e) { + fail("Max memory should be accepted"); + } + + // max vcores + try { + Resource resource = + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); + } catch (InvalidResourceRequestException e) { + fail("Max vcores should not be accepted"); + } + + // negative memory + try { + Resource resource = + Resources.createResource(-1, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); fail("Negative memory should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -243,12 +424,14 @@ public void testValidateResourceRequest() { // negative vcores try { - Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - -1); - ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + Resource resource = + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); fail("Negative vcores should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -256,12 +439,15 @@ public void testValidateResourceRequest() { // more than max memory try { - Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + Resource resource = + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); fail("More than max memory should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -269,13 +455,16 @@ public void testValidateResourceRequest() { // more than max vcores try { - Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - + 1); - ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); - SchedulerUtils.validateResourceRequest(resReq, maxResource); + Resource resource = + Resources + .createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); + ResourceRequest resReq = + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); + SchedulerUtils.validateResourceRequest(resReq, maxResource, null, + mockScheduler); fail("More than max vcores should not be accepted"); } catch (InvalidResourceRequestException e) { // expected diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index bd7f1bd..7b6aaf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -216,7 +216,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf, null, null, null, ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, amResource, null), null, null, - 0, null, null); + 0, null, null, null); rmContext.getRMApps().put(attId.getApplicationId(), rmApp); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( attId.getApplicationId(), queue, user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 67164c6..843555f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2420,7 +2420,7 @@ public void testNotAllowSubmitApplication() throws Exception { RMApp application = new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user, queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null); + System.currentTimeMillis(), "YARN", null, null); resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application); application.handle(new RMAppEvent(applicationId, RMAppEventType.START));