From 3339c4eba8c703b60a80046495603a1d25e22a23 Mon Sep 17 00:00:00 2001 From: Sunil Date: Thu, 1 Dec 2016 15:54:37 +0530 Subject: [PATCH] YARN-5932 --- .../hadoop/yarn/client/cli/ApplicationCLI.java | 14 ++- .../apache/hadoop/yarn/client/cli/TestYarnCLI.java | 56 ++++++++++- .../server/resourcemanager/ClientRMService.java | 16 +-- .../yarn/server/resourcemanager/RMAppManager.java | 110 +++++++++++++++++++-- .../server/resourcemanager/RMAppManagerEvent.java | 11 +++ .../resourcemanager/RMAppManagerEventType.java | 3 +- .../resourcemanager/rmapp/RMAppEventType.java | 1 - .../server/resourcemanager/rmapp/RMAppImpl.java | 47 ++------- .../resourcemanager/rmapp/RMAppMoveEvent.java | 44 --------- .../scheduler/AbstractYarnScheduler.java | 18 +++- .../resourcemanager/scheduler/YarnScheduler.java | 11 +++ .../scheduler/capacity/AbstractCSQueue.java | 9 ++ .../scheduler/capacity/CSQueue.java | 9 ++ .../scheduler/capacity/CapacityScheduler.java | 27 ++++- .../scheduler/capacity/LeafQueue.java | 19 +++- .../scheduler/capacity/ParentQueue.java | 29 ++++-- .../scheduler/fair/FairScheduler.java | 36 ++++++- .../resourcemanager/TestMoveApplication.java | 12 ++- 18 files changed, 341 insertions(+), 131 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 20a65bf..7538305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -88,6 +88,7 @@ public static final String APP_ID = "appId"; public static final String UPDATE_PRIORITY = "updatePriority"; public static final String UPDATE_LIFETIME = "updateLifetime"; + public static final String MOVE_APPLICATION = "move"; private boolean allAppStates; @@ -114,7 +115,7 @@ public int run(String[] args) throws Exception { + "based on application state and -appTags to filter applications " + "based on application tag."); opts.addOption(MOVE_TO_QUEUE_CMD, true, "Moves the application to a " - + "different queue."); + + "different queue. Deprecated command. Use 'move' instead."); opts.addOption(QUEUE_CMD, true, "Works with the movetoqueue command to" + " specify which queue to move an application to."); opts.addOption(HELP_CMD, false, "Displays help for all commands."); @@ -146,6 +147,9 @@ public int run(String[] args) throws Exception { opts.addOption(UPDATE_LIFETIME, true, "update timeout of an application from NOW. ApplicationId can be" + " passed using 'appId' option. Timeout value is in seconds."); + opts.addOption(MOVE_APPLICATION, true, + "Moves application to a new queue. ApplicationId can be" + + " passed using 'appId' option."); Option killOpt = new Option(KILL_CMD, true, "Kills the application. " + "Set of applications can be provided separated with space"); killOpt.setValueSeparator(' '); @@ -158,6 +162,7 @@ public int run(String[] args) throws Exception { opts.getOption(APP_ID).setArgName("Application ID"); opts.getOption(UPDATE_PRIORITY).setArgName("Priority"); opts.getOption(UPDATE_LIFETIME).setArgName("Timeout"); + opts.getOption(MOVE_APPLICATION).setArgName("Queue Name"); } else if (args.length > 0 && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) { title = APPLICATION_ATTEMPT; opts.addOption(STATUS_CMD, true, @@ -315,6 +320,13 @@ public int run(String[] args) throws Exception { updateApplicationTimeout(cliParser.getOptionValue(APP_ID), ApplicationTimeoutType.LIFETIME, timeoutInSec); + } else if (cliParser.hasOption(MOVE_APPLICATION)) { + if (!cliParser.hasOption(APP_ID)) { + printUsage(title, opts); + return exitCode; + } + moveApplicationAcrossQueues(cliParser.getOptionValue(APP_ID), + cliParser.getOptionValue(MOVE_APPLICATION)); } else if (cliParser.hasOption(SIGNAL_CMD)) { if (args.length < 3 || args.length > 4) { printUsage(title, opts); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index f9ec5c7..e2baa29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -1212,6 +1212,56 @@ public void testMoveApplicationAcrossQueues() throws Exception { } @Test + public void testMoveApplicationAcrossQueuesWithNewCommand() throws Exception { + ApplicationCLI cli = createAndGetAppCLI(); + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + + ApplicationReport newApplicationReport2 = ApplicationReport.newInstance( + applicationId, ApplicationAttemptId.newInstance(applicationId, 1), + "user", "queue", "appname", "host", 124, null, + YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( + newApplicationReport2); + int result = cli.run(new String[] { "application", "-appId", + applicationId.toString(), "-move", "targetqueue"}); + assertEquals(0, result); + verify(client, times(0)).moveApplicationAcrossQueues( + any(ApplicationId.class), any(String.class)); + verify(sysOut).println( + "Application " + applicationId + " has already finished "); + + ApplicationReport newApplicationReport = ApplicationReport.newInstance( + applicationId, ApplicationAttemptId.newInstance(applicationId, 1), + "user", "queue", "appname", "host", 124, null, + YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( + newApplicationReport); + result = cli.run(new String[] { "application", "-appId", + applicationId.toString(), "-move", "targetqueue"}); + assertEquals(0, result); + verify(client).moveApplicationAcrossQueues(any(ApplicationId.class), + any(String.class)); + verify(sysOut).println("Moving application application_1234_0005 to queue targetqueue"); + verify(sysOut).println("Successfully completed move."); + + doThrow(new ApplicationNotFoundException("Application with id '" + + applicationId + "' doesn't exist in RM.")).when(client) + .moveApplicationAcrossQueues(applicationId, "targetqueue"); + cli = createAndGetAppCLI(); + try { + result = cli.run(new String[] { "application", "-appId", + applicationId.toString(), "-move", "targetqueue"}); + Assert.fail(); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof ApplicationNotFoundException); + Assert.assertEquals("Application with id '" + applicationId + + "' doesn't exist in RM.", ex.getMessage()); + } + } + + @Test public void testListClusterNodes() throws Exception { List nodeReports = new ArrayList(); nodeReports.addAll(getNodeReports(1, NodeState.NEW)); @@ -1991,8 +2041,12 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" applications based on application state"); pw.println(" and -appTags to filter applications based"); pw.println(" on application tag."); + pw.println(" -move Moves application to a new queue."); + pw.println(" ApplicationId can be passed using 'appId'"); + pw.println(" option."); pw.println(" -movetoqueue Moves the application to a different"); - pw.println(" queue."); + pw.println(" queue. Deprecated command. Use 'move'"); + pw.println(" instead."); pw.println(" -queue Works with the movetoqueue command to"); pw.println(" specify which queue to move an"); pw.println(" application to."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 0db775f..5683c32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -152,7 +152,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -1191,23 +1190,18 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( } // Moves only allowed when app is in a state that means it is tracked by - // the scheduler - if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED, - RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED, - RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED) - .contains(application.getState())) { + // the scheduler. Introducing SUBMITTED state also to this list as there + // could be a corner scenario that app may not be in Scheduler in SUBMITTED + // state. + if (!ACTIVE_APP_STATES.contains(application.getState())) { String msg = "App in " + application.getState() + " state cannot be moved."; RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg); throw new YarnException(msg); } - SettableFuture future = SettableFuture.create(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppMoveEvent(applicationId, request.getTargetQueue(), future)); - try { - Futures.get(future, YarnException.class); + this.rmAppManager.moveApplicationAcrossQueue(applicationId, request.getTargetQueue()); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ce3da06..0223d59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -498,17 +498,28 @@ public void handle(RMAppManagerEvent event) { ApplicationId applicationId = event.getApplicationId(); LOG.debug("RMAppManager processing event for " + applicationId + " of type " + event.getType()); - switch(event.getType()) { - case APP_COMPLETED: - { + switch (event.getType()) { + case APP_COMPLETED : { finishApplication(applicationId); logApplicationSummary(applicationId); - checkAppNumCompletedLimit(); - } - break; - default: - LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + checkAppNumCompletedLimit(); } + break; + case APP_MOVE : { + // moveAllApps from scheduler will fire this event for each of + // those applications which needed to be moved to a new queue. + // Use the standard move application api to do the same. + try { + moveApplicationAcrossQueue(applicationId, + event.getTargetQueueForMove()); + } catch (YarnException e) { + LOG.warn("Move Application has failed: " + e.getMessage()); + } + } + break; + default : + LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + } } // transaction method. @@ -587,4 +598,87 @@ public void updateApplicationPriority(ApplicationId applicationId, rmContext.getSystemMetricsPublisher().appUpdated(app, System.currentTimeMillis()); } + + /** + * moveToQueue will invoke scheduler api to perform move queue operation. + * + * @param applicationId + * Application Id. + * @param targetQueue + * Target queue to which this app has to be moved. + * @throws YarnException + * Handle exceptions. + */ + public void moveApplicationAcrossQueue(ApplicationId applicationId, String targetQueue) + throws YarnException { + RMApp app = this.rmContext.getRMApps().get(applicationId); + + // Capacity scheduler will directly follow below approach. + // 1. Do a pre-validate check to ensure that changes are fine. + // 2. Update this information to state-store + // 3. Perform real move operation and update in-memory data structures. + synchronized (applicationId) { + if (app.isAppInCompletedStates()) { + return; + } + + String sourceQueue = app.getQueue(); + // 1. pre-validate move application request to check for any access + // violations or other errors. If there are any violations, YarnException + // will be thrown. + rmContext.getScheduler().preValidateMoveApplication(applicationId, + targetQueue); + + // 2. Update to state store with new queue and throw exception is failed. + updateAppDataToStateStore(targetQueue, app, false); + + // 3. Perform the real move application + String queue = ""; + try { + queue = rmContext.getScheduler().moveApplication(applicationId, + targetQueue); + } catch (YarnException e) { + // Revert to source queue since in-memory move has failed. Chances + // of this is very rare as we have already done the pre-validation. + updateAppDataToStateStore(sourceQueue, app, true); + throw e; + } + + // update in-memory + if (queue != null && !queue.isEmpty()) { + app.setQueue(queue); + } + } + + rmContext.getSystemMetricsPublisher().appUpdated(app, + System.currentTimeMillis()); + } + + private void updateAppDataToStateStore(String queue, RMApp app, + boolean toSuppressException) throws YarnException { + // Create a future object to capture exceptions from StateStore. + SettableFuture future = SettableFuture.create(); + + // Update new queue in Submission Context to update to StateStore. + app.getApplicationSubmissionContext().setQueue(queue); + + ApplicationStateData appState = ApplicationStateData.newInstance( + app.getSubmitTime(), app.getStartTime(), + app.getApplicationSubmissionContext(), app.getUser(), + app.getCallerContext()); + appState.setApplicationTimeouts(app.getApplicationTimeouts()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false, future); + + try { + Futures.get(future, YarnException.class); + } catch (YarnException ex) { + if (!toSuppressException) { + throw ex; + } + LOG.error("Statestore update failed for move application '" + + app.getApplicationId() + "' to queue '" + queue + + "' with below exception:" + ex.getMessage()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java index f1a6781..0df3cab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java @@ -24,13 +24,24 @@ public class RMAppManagerEvent extends AbstractEvent { private final ApplicationId appId; + private final String targetQueueForMove; public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) { + this(appId, "", type); + } + + public RMAppManagerEvent(ApplicationId appId, String targetQueueForMove, + RMAppManagerEventType type) { super(type); this.appId = appId; + this.targetQueueForMove = targetQueueForMove; } public ApplicationId getApplicationId() { return this.appId; } + + public String getTargetQueueForMove() { + return this.targetQueueForMove; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java index 1b6a44c..7acf753 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java @@ -19,5 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; public enum RMAppManagerEventType { - APP_COMPLETED + APP_COMPLETED, + APP_MOVE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 2b42638..aa5d6f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -23,7 +23,6 @@ START, RECOVER, KILL, - MOVE, // Move app to a new queue // Source: Scheduler and RMAppManager APP_REJECTED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0bf5f51..4fdb705 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -247,15 +247,11 @@ RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED)) - .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.MOVE, new RMAppMoveTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, @@ -272,8 +268,6 @@ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( @@ -301,8 +295,6 @@ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, @@ -338,7 +330,7 @@ // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, - RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) + RMAppEventType.APP_NEW_SAVED)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, @@ -353,7 +345,7 @@ EnumSet.of(RMAppEventType.NODE_UPDATE, // ignore Kill/Move as we have already saved the final Finished state // in state store. - RMAppEventType.KILL, RMAppEventType.MOVE)) + RMAppEventType.KILL)) // Transitions from KILLING state .addTransition(RMAppState.KILLING, RMAppState.KILLING, @@ -383,7 +375,7 @@ RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_REGISTERED, RMAppEventType.APP_UPDATE_SAVED, - RMAppEventType.KILL, RMAppEventType.MOVE)) + RMAppEventType.KILL)) // Transitions from FINISHED state // ignorable transitions @@ -395,7 +387,7 @@ RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL, RMAppEventType.MOVE)) + RMAppEventType.KILL)) // Transitions from FAILED state // ignorable transitions @@ -403,8 +395,7 @@ RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, - RMAppEventType.MOVE)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions @@ -417,7 +408,7 @@ EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) + RMAppEventType.NODE_UPDATE)) .installTopology(); @@ -1077,32 +1068,6 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - /** - * Move an app to a new queue. - * This transition must set the result on the Future in the RMAppMoveEvent, - * either as an exception for failure or null for success, or the client will - * be left waiting forever. - */ - private static final class RMAppMoveTransition extends RMAppTransition { - public void transition(RMAppImpl app, RMAppEvent event) { - RMAppMoveEvent moveEvent = (RMAppMoveEvent) event; - try { - app.queue = app.scheduler.moveApplication(app.applicationId, - moveEvent.getTargetQueue()); - } catch (YarnException ex) { - moveEvent.getResult().setException(ex); - return; - } - - app.rmContext.getSystemMetricsPublisher().appUpdated(app, - app.systemClock.getTime()); - - // TODO: Write out change to state store (YARN-1558) - // Also take care of RM failover - moveEvent.getResult().set(null); - } - } - // synchronously recover attempt to ensure any incoming external events // to be processed after the attempt processes the recover event. private void recoverAppAttempts() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java deleted file mode 100644 index 5fc63c9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -import com.google.common.util.concurrent.SettableFuture; - -public class RMAppMoveEvent extends RMAppEvent { - private String targetQueue; - private SettableFuture result; - - public RMAppMoveEvent(ApplicationId id, String newQueue, - SettableFuture resultFuture) { - super(id, RMAppEventType.MOVE); - this.targetQueue = newQueue; - this.result = resultFuture; - } - - public String getTargetQueue() { - return targetQueue; - } - - public SettableFuture getResult() { - return result; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4818ea3..c0cc6b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -64,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -360,6 +361,13 @@ public String moveApplication(ApplicationId appId, String newQueue) + " does not support moving apps between queues"); } + @Override + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support pre-validation of moving apps between queues"); + } + public void removeQueue(String queueName) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support removing queues"); @@ -675,10 +683,10 @@ public void moveAllApps(String sourceQueue, String destQueue) throw new YarnException(errMsg); } // generate move events for each pending/running app - for (ApplicationAttemptId app : apps) { - SettableFuture future = SettableFuture.create(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + for (ApplicationAttemptId appAttemptId : apps) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppManagerEvent(appAttemptId.getApplicationId(), + destQueue, RMAppManagerEventType.APP_MOVE)); } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 7167384..ea1ae60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -230,6 +230,17 @@ public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; /** + * + * @param appId Application ID + * @param newQueue Target QueueName + * @throws YarnException if the pre-validation for move cannot be carried out + */ + @LimitedPrivate("yarn") + @Evolving + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException; + + /** * Completely drain sourceQueue of applications, by moving all of them to * destQueue. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3daabaf..b00fc04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -32,8 +32,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -67,6 +69,7 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); volatile CSQueue parent; final String queueName; @@ -813,4 +816,10 @@ public boolean accept(Resource cluster, return true; } + + @Override + public void validateSubmitApplication(ApplicationId applicationId, + String userName) throws AccessControlException { + // Dummy implementation + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index baf60e4..9f6e3e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -362,4 +362,13 @@ void apply(Resource cluster, * @return readLock of corresponding queue. */ public ReentrantReadWriteLock.ReadLock getReadLock(); + + /** + * Validate submitApplication api so that moveApplication do a pre-check. + * @param applicationId Application ID + * @param userName User Name + * @throws AccessControlException + */ + public void validateSubmitApplication(ApplicationId applicationId, + String userName) throws AccessControlException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e42b20c..15f7779 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2049,9 +2049,8 @@ public String moveApplication(ApplicationId appId, sourceQueueName); String destQueueName = handleMoveToPlanQueue(targetQueueName); LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); - // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); - checkQueuePartition(app, dest); try { dest.submitApplication(appId, user, destQueueName); } catch (AccessControlException e) { @@ -2079,6 +2078,30 @@ public String moveApplication(ApplicationId appId, } } + @Override + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException { + try { + writeLock.lock(); + FiCaSchedulerApp app = getApplicationAttempt( + ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + this.queueManager.getAndCheckLeafQueue(sourceQueueName); + String destQueueName = handleMoveToPlanQueue(newQueue); + LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + checkQueuePartition(app, dest); + try { + dest.validateSubmitApplication(appId, user); + } catch (AccessControlException e) { + throw new YarnException(e); + } + } finally { + writeLock.unlock(); + } + } + /** * Check application can be moved to queue with labels enabled. All labels in * application life time will be checked diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9661206..3f07404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -564,6 +564,21 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, public void submitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! + validateSubmitApplication(applicationId, userName); + + // Inform the parent queue + try { + getParent().submitApplication(applicationId, userName, queue); + } catch (AccessControlException ace) { + LOG.info("Failed to submit application to parent-queue: " + + getParent().getQueuePath(), ace); + throw ace; + } + + } + + public void validateSubmitApplication(ApplicationId applicationId, + String userName) throws AccessControlException { try { writeLock.lock(); // Check if the queue is accepting jobs @@ -598,15 +613,13 @@ public void submitApplication(ApplicationId applicationId, String userName, writeLock.unlock(); } - // Inform the parent queue try { - getParent().submitApplication(applicationId, userName, queue); + getParent().validateSubmitApplication(applicationId, userName); } catch (AccessControlException ace) { LOG.info("Failed to submit application to parent-queue: " + getParent().getQueuePath(), ace); throw ace; } - } public Resource getAMResourceLimit() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index fd0c68b..3409e9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -340,16 +340,7 @@ public void submitApplication(ApplicationId applicationId, String user, try { writeLock.lock(); // Sanity check - if (queue.equals(queueName)) { - throw new AccessControlException( - "Cannot submit application " + "to non-leaf queue: " + queueName); - } - - if (state != QueueState.RUNNING) { - throw new AccessControlException("Queue " + getQueuePath() - + " is STOPPED. Cannot accept submission of application: " - + applicationId); - } + validateSubmitApplication(applicationId, queue); addApplication(applicationId, user); } finally { @@ -369,6 +360,24 @@ public void submitApplication(ApplicationId applicationId, String user, } } + public void validateSubmitApplication(ApplicationId applicationId, + String queue) throws AccessControlException { + try { + writeLock.lock(); + if (queue.equals(queueName)) { + throw new AccessControlException( + "Cannot submit application " + "to non-leaf queue: " + queueName); + } + + if (state != QueueState.RUNNING) { + throw new AccessControlException("Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId); + } + } finally { + writeLock.unlock(); + } + } @Override public void submitApplicationAttempt(FiCaSchedulerApp application, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fbcac76..03df5d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1544,7 +1544,41 @@ public String moveApplication(ApplicationId appId, writeLock.unlock(); } } - + + @Override + public void preValidateMoveApplication(ApplicationId appId, String newQueue) + throws YarnException { + try { + writeLock.lock(); + SchedulerApplication app = applications.get(appId); + if (app == null) { + throw new YarnException("App to be moved " + appId + " not found."); + } + + FSAppAttempt attempt = app.getCurrentAppAttempt(); + // To serialize with FairScheduler#allocate, synchronize on app attempt + + try { + attempt.getWriteLock().lock(); + FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); + String destQueueName = handleMoveToPlanQueue(newQueue); + FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); + if (targetQueue == null) { + throw new YarnException("Target queue " + newQueue + + " not found or is not a leaf queue."); + } + + if (oldQueue.isRunnableApp(attempt)) { + verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); + } + } finally { + attempt.getWriteLock().unlock(); + } + } finally { + writeLock.unlock(); + } + } + private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { String queueName = targetQueue.getQueueName(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java index d2bde80..05b25df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java @@ -87,10 +87,10 @@ public void testMoveRejectedByScheduler() throws Exception { application.getApplicationId(), "newqueue")); fail("Should have hit exception"); } catch (YarnException ex) { - assertEquals("Move not supported", ex.getCause().getMessage()); + assertEquals("Move not supported", ex.getMessage()); } } - + @Test (timeout = 10000) public void testMoveTooLate() throws Exception { // Submit application @@ -178,5 +178,13 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { return acl != QueueACL.ADMINISTER_QUEUE; } + + @Override + public void preValidateMoveApplication(ApplicationId appId, String newQueue) + throws YarnException { + if (failMove) { + throw new YarnException("Move not supported"); + } + } } } -- 2.7.4 (Apple Git-66)