From b0f714fc699d6db0ef4789245a1f9c1e06ebe209 Mon Sep 17 00:00:00 2001 From: Sunil Date: Wed, 23 Nov 2016 19:09:39 +0530 Subject: [PATCH] YARN-5932 --- .../server/resourcemanager/ClientRMService.java | 6 +- .../yarn/server/resourcemanager/RMAppManager.java | 93 ++++++++++++++++++++++ .../scheduler/AbstractYarnScheduler.java | 7 ++ .../resourcemanager/scheduler/YarnScheduler.java | 11 +++ .../scheduler/capacity/AbstractCSQueue.java | 9 +++ .../scheduler/capacity/CSQueue.java | 9 +++ .../scheduler/capacity/CapacityScheduler.java | 26 +++++- .../scheduler/capacity/LeafQueue.java | 19 ++++- .../scheduler/capacity/ParentQueue.java | 29 ++++--- 9 files changed, 189 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 4e36b6c..863bc52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1202,12 +1202,8 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( throw new YarnException(msg); } - SettableFuture future = SettableFuture.create(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppMoveEvent(applicationId, request.getTargetQueue(), future)); - try { - Futures.get(future, YarnException.class); + this.rmAppManager.moveToQueue(applicationId, request.getTargetQueue()); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index b5f02fc..ec3aca7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -591,4 +592,96 @@ public void updateApplicationPriority(ApplicationId applicationId, rmContext.getSystemMetricsPublisher().appUpdated(app, System.currentTimeMillis()); } + + /** + * moveToQueue will invoke scheduler api to perform move queue operation. + * + * @param applicationId + * Application Id. + * @param targetQueue + * Target queue to which this app has to be moved. + * @throws YarnException + * Handle exceptions. + */ + @SuppressWarnings("unchecked") + public void moveToQueue(ApplicationId applicationId, String targetQueue) + throws YarnException { + RMApp app = this.rmContext.getRMApps().get(applicationId); + + // Other scheduler's can make use of the existing old event approach. + if (!(scheduler instanceof CapacityScheduler)) { + SettableFuture future = SettableFuture.create(); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppMoveEvent(applicationId, targetQueue, future)); + Futures.get(future, YarnException.class); + return; + } + + // Capacity scheduler will directly follow below approach. + // 1. Do a pre-validate check to ensure that changes are fine. + // 2. Update this information to state-store + // 3. Perform real move operation and update in-memory data structures. + synchronized (applicationId) { + if (app.isAppInCompletedStates()) { + return; + } + + String sourceQueue = app.getQueue(); + // 1. pre-validate move application request to check for any access + // violations or other errors. If there are any violations, YarnException + // will be thrown. + rmContext.getScheduler().preValidateMoveApplication(applicationId, + targetQueue); + + // 2. Update to state store with new queue and throw exception is failed. + updateAppDataToStateStore(targetQueue, app, false); + + // 3. Perform the real move application + String queue = ""; + try { + queue = rmContext.getScheduler().moveApplication(applicationId, + targetQueue); + } catch (YarnException e) { + // Revert to source queue since in-memory move has failed. Chances + // of this is very rare as we have already done the pre-validation. + updateAppDataToStateStore(sourceQueue, app, true); + throw e; + } + + // update in-memory + if (queue != null && !queue.isEmpty()) { + ((RMAppImpl) app).setQueue(queue); + } + } + + rmContext.getSystemMetricsPublisher().appUpdated(app, + System.currentTimeMillis()); + } + + private void updateAppDataToStateStore(String queue, RMApp app, + boolean toSuppressException) throws YarnException { + // Create a future object to capture exceptions from StateStore. + SettableFuture future = SettableFuture.create(); + + // Update new queue in Submission Context to update to StateStore. + app.getApplicationSubmissionContext().setQueue(queue); + + ApplicationStateData appState = ApplicationStateData.newInstance( + app.getSubmitTime(), app.getStartTime(), + app.getApplicationSubmissionContext(), app.getUser(), + app.getCallerContext()); + appState.setApplicationTimeouts(app.getApplicationTimeouts()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false, future); + + try { + Futures.get(future, YarnException.class); + } catch (YarnException ex) { + if (!toSuppressException) { + throw ex; + } + LOG.warn( + "Statestore update failed with below exception:" + ex.getMessage()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 94af4dd..6743aeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -359,6 +359,13 @@ public String moveApplication(ApplicationId appId, String newQueue) + " does not support moving apps between queues"); } + @Override + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support pre-validation of moving apps between queues"); + } + public void removeQueue(String queueName) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support removing queues"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 62ab7f4..86a9692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -229,6 +229,17 @@ public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; /** + * + * @param appId Application ID + * @param newQueue Target QueueName + * @throws YarnException if the pre-validation for move cannot be carried out + */ + @LimitedPrivate("yarn") + @Evolving + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException; + + /** * Completely drain sourceQueue of applications, by moving all of them to * destQueue. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3daabaf..b00fc04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -32,8 +32,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -67,6 +69,7 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); volatile CSQueue parent; final String queueName; @@ -813,4 +816,10 @@ public boolean accept(Resource cluster, return true; } + + @Override + public void validateSubmitApplication(ApplicationId applicationId, + String userName) throws AccessControlException { + // Dummy implementation + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index baf60e4..17fd9f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -362,4 +362,13 @@ void apply(Resource cluster, * @return readLock of corresponding queue. */ public ReentrantReadWriteLock.ReadLock getReadLock(); + + /** + * Validate submitApplication api so that moveApplication do a pre-check. + * @param applicationId Application ID + * @param userName User Name + * @throws AccessControlException + */ + public void validateSubmitApplication(ApplicationId applicationId, + String userName) throws AccessControlException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index aea2e4e..76cf876 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2229,9 +2229,7 @@ public String moveApplication(ApplicationId appId, LeafQueue source = getAndCheckLeafQueue(sourceQueueName); String destQueueName = handleMoveToPlanQueue(targetQueueName); LeafQueue dest = getAndCheckLeafQueue(destQueueName); - // Validation check - ACLs, submission limits for user & queue String user = app.getUser(); - checkQueuePartition(app, dest); try { dest.submitApplication(appId, user, destQueueName); } catch (AccessControlException e) { @@ -2259,6 +2257,30 @@ public String moveApplication(ApplicationId appId, } } + @Override + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException { + try { + writeLock.lock(); + FiCaSchedulerApp app = getApplicationAttempt( + ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + getAndCheckLeafQueue(sourceQueueName); + String destQueueName = handleMoveToPlanQueue(newQueue); + LeafQueue dest = getAndCheckLeafQueue(destQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + checkQueuePartition(app, dest); + try { + dest.validateSubmitApplication(appId, user); + } catch (AccessControlException e) { + throw new YarnException(e); + } + } finally { + writeLock.unlock(); + } + } + /** * Check application can be moved to queue with labels enabled. All labels in * application life time will be checked diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9661206..0a5df6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -564,6 +564,21 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, public void submitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! + validateSubmitApplication(applicationId, userName); + + // Inform the parent queue + try { + getParent().submitApplication(applicationId, userName, queue); + } catch (AccessControlException ace) { + LOG.info("Failed to submit application to parent-queue: " + + getParent().getQueuePath(), ace); + throw ace; + } + + } + + public void validateSubmitApplication(ApplicationId applicationId, + String userName) throws AccessControlException { try { writeLock.lock(); // Check if the queue is accepting jobs @@ -598,15 +613,13 @@ public void submitApplication(ApplicationId applicationId, String userName, writeLock.unlock(); } - // Inform the parent queue try { - getParent().submitApplication(applicationId, userName, queue); + getParent().validateSubmitApplication(applicationId, userName); } catch (AccessControlException ace) { LOG.info("Failed to submit application to parent-queue: " + getParent().getQueuePath(), ace); throw ace; } - } public Resource getAMResourceLimit() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index fd0c68b..3409e9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -340,16 +340,7 @@ public void submitApplication(ApplicationId applicationId, String user, try { writeLock.lock(); // Sanity check - if (queue.equals(queueName)) { - throw new AccessControlException( - "Cannot submit application " + "to non-leaf queue: " + queueName); - } - - if (state != QueueState.RUNNING) { - throw new AccessControlException("Queue " + getQueuePath() - + " is STOPPED. Cannot accept submission of application: " - + applicationId); - } + validateSubmitApplication(applicationId, queue); addApplication(applicationId, user); } finally { @@ -369,6 +360,24 @@ public void submitApplication(ApplicationId applicationId, String user, } } + public void validateSubmitApplication(ApplicationId applicationId, + String queue) throws AccessControlException { + try { + writeLock.lock(); + if (queue.equals(queueName)) { + throw new AccessControlException( + "Cannot submit application " + "to non-leaf queue: " + queueName); + } + + if (state != QueueState.RUNNING) { + throw new AccessControlException("Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId); + } + } finally { + writeLock.unlock(); + } + } @Override public void submitApplicationAttempt(FiCaSchedulerApp application, -- 2.7.4 (Apple Git-66)