From 084a41a1f5a385f56cef002b27acfbeca06bfcd1 Mon Sep 17 00:00:00 2001 From: Sunil Date: Tue, 20 Dec 2016 14:50:45 +0530 Subject: [PATCH] YARN-5984 --- .../scheduler/AbstractYarnScheduler.java | 12 ++++ .../scheduler/capacity/CapacityScheduler.java | 84 ++++++++++++---------- .../scheduler/fair/FairScheduler.java | 16 ++--- 3 files changed, 62 insertions(+), 50 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c0cc6b0..328fc9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -78,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -1052,4 +1054,14 @@ protected void normalizeRequests(List asks) { normalizeRequest(ask); } } + + protected String handleMoveToPlanQueue(Queue targetQueue) { + String targetQueueName = targetQueue.getQueueName(); + if (targetQueue != null && targetQueue instanceof PlanQueue) { + // use the default child reservation queue of the plan + targetQueueName = targetQueue.getQueueName() + + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + } + return targetQueueName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9a73a65..6f108e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2044,40 +2044,52 @@ public String moveApplication(ApplicationId appId, writeLock.lock(); FiCaSchedulerApp app = getApplicationAttempt( ApplicationAttemptId.newInstance(appId, 0)); - String sourceQueueName = app.getQueue().getQueueName(); - LeafQueue source = this.queueManager.getAndCheckLeafQueue( - sourceQueueName); - String destQueueName = handleMoveToPlanQueue(targetQueueName); - LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); + LeafQueue sourceQueue = this.queueManager + .getAndCheckLeafQueue(app.getQueue().getQueueName()); - String user = app.getUser(); - try { - dest.submitApplication(appId, user, destQueueName); - } catch (AccessControlException e) { - throw new YarnException(e); - } - // Move all live containers - for (RMContainer rmContainer : app.getLiveContainers()) { - source.detachContainer(getClusterResource(), app, rmContainer); - // attach the Container to another queue - dest.attachContainer(getClusterResource(), app, rmContainer); - } - // Detach the application.. - source.finishApplicationAttempt(app, sourceQueueName); - source.getParent().finishApplication(appId, app.getUser()); - // Finish app & update metrics - app.move(dest); - // Submit to a new queue - dest.submitApplicationAttempt(app, user); - applications.get(appId).setQueue(dest); - LOG.info("App: " + app.getApplicationId() + " successfully moved from " - + sourceQueueName + " to: " + destQueueName); + // Get target queue name considering plan queues as well. + Queue target = getQueue(targetQueueName); + String destQueueName = handleMoveToPlanQueue(target); + LeafQueue targetQueue = this.queueManager + .getAndCheckLeafQueue(destQueueName); + + executeMoveApplication(appId, app, sourceQueue, targetQueue); return targetQueueName; } finally { writeLock.unlock(); } } + private void executeMoveApplication(ApplicationId appId, FiCaSchedulerApp app, + LeafQueue sourceQueue, LeafQueue targetQueue) throws YarnException { + String user = app.getUser(); + try { + targetQueue.submitApplication(appId, user, targetQueue.getQueueName()); + } catch (AccessControlException e) { + throw new YarnException(e); + } + + // Move all live containers + for (RMContainer rmContainer : app.getLiveContainers()) { + sourceQueue.detachContainer(getClusterResource(), app, rmContainer); + // attach container to target queue + targetQueue.attachContainer(getClusterResource(), app, rmContainer); + } + + // Detach the application.. + sourceQueue.finishApplicationAttempt(app, sourceQueue.getQueueName()); + sourceQueue.getParent().finishApplication(appId, app.getUser()); + + // Finish app & update metrics + app.move(targetQueue); + + // Submit to a new queue + targetQueue.submitApplicationAttempt(app, user); + applications.get(appId).setQueue(targetQueue); + LOG.info("App: " + app.getApplicationId() + " successfully moved from " + + sourceQueue.getQueueName() + " to: " + targetQueue.getQueueName()); + } + @Override public void preValidateMoveApplication(ApplicationId appId, String newQueue) throws YarnException { @@ -2085,10 +2097,13 @@ public void preValidateMoveApplication(ApplicationId appId, writeLock.lock(); FiCaSchedulerApp app = getApplicationAttempt( ApplicationAttemptId.newInstance(appId, 0)); - String sourceQueueName = app.getQueue().getQueueName(); - this.queueManager.getAndCheckLeafQueue(sourceQueueName); - String destQueueName = handleMoveToPlanQueue(newQueue); + this.queueManager.getAndCheckLeafQueue(app.getQueue().getQueueName()); + + // Get target queue name considering plan queues as well. + Queue target = getQueue(newQueue); + String destQueueName = handleMoveToPlanQueue(target); LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); + // Validation check - ACLs, submission limits for user & queue String user = app.getUser(); checkQueuePartition(app, dest); @@ -2159,15 +2174,6 @@ public Resource getMaximumResourceCapability(String queueName) { return ((LeafQueue)queue).getMaximumAllocation(); } - private String handleMoveToPlanQueue(String targetQueueName) { - CSQueue dest = getQueue(targetQueueName); - if (dest != null && dest instanceof PlanQueue) { - // use the default child reservation queue of the plan - targetQueueName = targetQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; - } - return targetQueueName; - } - @Override public Set getPlanQueues() { Set ret = new HashSet(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e790bc2..606709e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -1534,7 +1535,8 @@ public String moveApplication(ApplicationId appId, throw new YarnException("Application " + appId + " is stopped and can't be moved!"); } - String destQueueName = handleMoveToPlanQueue(queueName); + Queue target = queueMgr.getQueue(queueName); + String destQueueName = handleMoveToPlanQueue(target); FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); if (targetQueue == null) { throw new YarnException("Target queue " + queueName @@ -1574,7 +1576,8 @@ public void preValidateMoveApplication(ApplicationId appId, String newQueue) try { attempt.getWriteLock().lock(); FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); - String destQueueName = handleMoveToPlanQueue(newQueue); + Queue target = queueMgr.getQueue(newQueue); + String destQueueName = handleMoveToPlanQueue(target); FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); if (targetQueue == null) { throw new YarnException("Target queue " + newQueue @@ -1755,15 +1758,6 @@ public void removeQueue(String queueName) throws YarnException { } } - private String handleMoveToPlanQueue(String targetQueueName) { - FSQueue dest = queueMgr.getQueue(targetQueueName); - if (dest != null && allocConf.isReservable(dest.getQueueName())) { - // use the default child reservation queue of the plan - targetQueueName = getDefaultQueueForPlanQueue(targetQueueName); - } - return targetQueueName; - } - @Override protected void decreaseContainer( SchedContainerChangeRequest decreaseRequest, -- 2.7.4 (Apple Git-66)