diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 1f0e777..b2c8d62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -45,6 +45,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; @@ -66,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -494,4 +499,73 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( public SignalContainerResponse signalContainer( SignalContainerRequest request) throws YarnException, IOException; + + /** + *

+ * The interface used by clients to request the ResourceManager + * to abort applications by appStates. + *

+ * + *

+ * The client, via {@link KillApplicationsByAppStatesRequest} provides the + * {@link YarnApplicationState} of the applications to be aborted. + *

+ * + *

+ * In secure mode,the ResourceManager verifies access to the + * application, queue etc. before terminating the application. + *

+ * + *

+ * Currently, the ResourceManager returns an empty response on + * success and throws an exception on rejecting the request. + *

+ * + * @param request request to abort the application by appStates + * @return ResourceManager returns an empty response on success + * and throws an exception on rejecting the request + * @throws YarnException + * @throws IOException + * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) + */ + @Public + @Stable + @Idempotent + public KillApplicationsByAppStatesResponse killApplicationsByAppStates( + KillApplicationsByAppStatesRequest request) throws YarnException, + IOException; + + /** + *

+ * The interface used by clients to request the ResourceManager + * to abort applications of queue. + *

+ * + *

+ * The client, via {@link KillApplicationsOfQueueRequest} provides the queue + * name of the applications to be aborted. + *

+ * + *

+ * In secure mode,the ResourceManager verifies access to the + * application, queue etc. before terminating the application. + *

+ * + *

+ * Currently, the ResourceManager returns an empty response on + * success and throws an exception on rejecting the request. + *

+ * + * @param request request to abort the application of queue + * @return ResourceManager returns an empty response on success + * and throws an exception on rejecting the request + * @throws YarnException + * @throws IOException + * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) + */ + @Public + @Stable + @Idempotent + public KillApplicationsOfQueueResponse killApplicationsOfQueue( + KillApplicationsOfQueueRequest request) throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsByAppStatesRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsByAppStatesRequest.java new file mode 100644 index 0000000..8194550 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsByAppStatesRequest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by the client to the ResourceManager to abort + * the applications by appStates. + *

+ *

+ * The client, via {@link org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest} provides the + * {@link org.apache.hadoop.yarn.api.records.YarnApplicationState} of the applications to be aborted. + *

+ * + * @see org.apache.hadoop.yarn.api.ApplicationClientProtocol#killApplicationsByApRpStates(org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest) + */ +@Public +@Stable +public abstract class KillApplicationsByAppStatesRequest { + @Private + @Unstable + public static KillApplicationsByAppStatesRequest newInstance( + EnumSet applicationStates) { + KillApplicationsByAppStatesRequest request = + Records.newRecord(KillApplicationsByAppStatesRequest.class); + request.setApplicationStates(applicationStates); + return request; + } + + /** + * Get the application states to filter applications on + * + * @return Set of Application states to filter on + */ + @Public + @Stable + public abstract EnumSet getApplicationStates(); + + /** + * Set the application states to filter applications on + * + * @param applicationStates A Set of Application states to filter on. If not + * defined, match all running applications + */ + @Private + @Unstable + public abstract void setApplicationStates( + EnumSet applicationStates); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsByAppStatesResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsByAppStatesResponse.java new file mode 100644 index 0000000..032f9d5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsByAppStatesResponse.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * The response sent by the ResourceManager to the client aborting + * the applications by appStates. + *

+ * The response, includes: + *

    + *
  • + * A flag which indicates that the process of killing the application is + * completed or not.
  • + *
+ * Note: user is recommended to wait until this flag becomes true, otherwise if + * the ResourceManager crashes before the process of killing the + * application is completed, the ResourceManager may retry this + * application on recovery. + * + * @see org.apache.hadoop.yarn.api.ApplicationClientProtocol#killApplicationsByAppStates(KillApplicationsByAppStatesRequest) + */ +@Public +@Stable +public abstract class KillApplicationsByAppStatesResponse { + @Private + @Unstable + public static KillApplicationsByAppStatesResponse newInstance(boolean isKillCompleted) { + KillApplicationsByAppStatesResponse response = + Records.newRecord(KillApplicationsByAppStatesResponse.class); + response.setIsKillCompleted(isKillCompleted); + return response; + } + + /** + * Get the flag which indicates that the process of killing application is completed or not. + */ + @Public + @Stable + public abstract boolean getIsKillCompleted(); + + /** + * Set the flag which indicates that the process of killing application is completed or not. + */ + @Private + @Unstable + public abstract void setIsKillCompleted(boolean isKillCompleted); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsOfQueueRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsOfQueueRequest.java new file mode 100644 index 0000000..5223e4c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsOfQueueRequest.java @@ -0,0 +1,43 @@ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by the client to the ResourceManager to abort + * the applications of queue. + *

+ *

+ * The client, via {@link org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest} provides the queue + * name of the applications to be aborted. + *

+ * + * @see org.apache.hadoop.yarn.api.ApplicationClientProtocol#killApplicationsOfQueue(org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest) + */ +@Public +@Unstable +public abstract class KillApplicationsOfQueueRequest { + public static KillApplicationsOfQueueRequest newInstance(String queue) { + KillApplicationsOfQueueRequest request = + Records.newRecord(KillApplicationsOfQueueRequest.class); + request.setQueue(queue); + return request; + } + + /** + * Get the queue name to filter applications on + * + * @return the name of the queue + */ + public abstract String getQueue(); + + /** + * Set the queue name to filter applications on + * + * @param the name of the queue + */ + public abstract void setQueue(String queue); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsOfQueueResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsOfQueueResponse.java new file mode 100644 index 0000000..9915602 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationsOfQueueResponse.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * The response sent by the ResourceManager to the client aborting + * the applications of queue. + *

+ * The response, includes: + *

    + *
  • + * A flag which indicates that the process of killing the application is + * completed or not.
  • + *
+ * Note: user is recommended to wait until this flag becomes true, otherwise if + * the ResourceManager crashes before the process of killing the + * application is completed, the ResourceManager may retry this + * application on recovery. + * + * @see org.apache.hadoop.yarn.api.ApplicationClientProtocol#killApplicationsOfQueue(org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest) + */ +@Public +@Stable +public abstract class KillApplicationsOfQueueResponse { + @Private + @Unstable + public static KillApplicationsOfQueueResponse newInstance(boolean isKillCompleted) { + KillApplicationsOfQueueResponse response = + Records.newRecord(KillApplicationsOfQueueResponse.class); + response.setIsKillCompleted(isKillCompleted); + return response; + } + + /** + * Get the flag which indicates that the process of killing application is completed or not. + */ + @Public + @Stable + public abstract boolean getIsKillCompleted(); + + /** + * Set the flag which indicates that the process of killing application is completed or not. + */ + @Private + @Unstable + public abstract void setIsKillCompleted(boolean isKillCompleted); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index e98726b..a78b817 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -58,4 +58,6 @@ service ApplicationClientProtocolService { rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); + rpc killApplicationsByAppStates (KillApplicationsByAppStatesRequestProto) returns (KillApplicationsByAppStatesResponseProto); + rpc killApplicationsOfQueue (KillApplicationsOfQueueRequestProto) returns (KillApplicationsOfQueueResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8924eba..44ec167 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -241,6 +241,22 @@ message SignalContainerRequestProto { message SignalContainerResponseProto { } +message KillApplicationsByAppStatesRequestProto { + repeated YarnApplicationStateProto application_states = 1; +} + +message KillApplicationsByAppStatesResponseProto { + optional bool is_kill_completed = 1 [default = false]; +} + +message KillApplicationsOfQueueRequestProto { + required string queue = 1; +} + +message KillApplicationsOfQueueResponseProto { + optional bool is_kill_completed = 1 [default = false]; +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 523698f..15ed935 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -713,4 +713,34 @@ public abstract void updateApplicationPriority(ApplicationId applicationId, */ public abstract void signalContainer(ContainerId containerId, SignalContainerCommand command) throws YarnException, IOException; + + /** + *

+ * Kill applications by appStates. + *

+ * + * @param applicationStates {@link YarnApplicationState} of the applications + * that needs to be killed + * @throws YarnException in case of errors or if YARN rejects the request due + * to access-control restrictions. + * @throws IOException + * @see #getQueueAclsInfo() + */ + public abstract void killApplicationsByAppStates( + EnumSet applicationStates) throws YarnException, + IOException; + + /** + *

+ * Kill applications of queue. + *

+ * + * @param queue queueName of the applications that needs to be killed + * @throws YarnException in case of errors or if YARN rejects the request due + * to access-control restrictions. + * @throws IOException + * @see #getQueueAclsInfo() + */ + public abstract void killApplicationsOfQueue(String queue) + throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 29fd417..dd8a681 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -70,6 +70,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; @@ -848,4 +852,77 @@ public void signalContainer(ContainerId containerId, SignalContainerRequest.newInstance(containerId, command); rmClient.signalContainer(request); } + + @Override + public void killApplicationsByAppStates( + EnumSet applicationStates) throws YarnException, + IOException { + KillApplicationsByAppStatesRequest request = + Records.newRecord(KillApplicationsByAppStatesRequest.class); + request.setApplicationStates(applicationStates); + + try { + int pollCount = 0; + long startTime = System.currentTimeMillis(); + + while (true) { + KillApplicationsByAppStatesResponse response = null; + rmClient.killApplicationsByAppStates(request); + if (response.getIsKillCompleted()) { + LOG.info("Killed applications successfully."); + break; + } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() + && elapsedMillis >= this.asyncApiPollTimeoutMillis) { + throw new YarnException( + "Timed out while waiting for remaining applications to be killed."); + } + + if (++pollCount % 10 == 0) { + LOG.info("Waiting for remaining applications to be killed."); + } + Thread.sleep(asyncApiPollIntervalMillis); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for remaining applications to be killed."); + } + } + + @Override + public void killApplicationsOfQueue(String queue) throws YarnException, + IOException { + KillApplicationsOfQueueRequest request = + Records.newRecord(KillApplicationsOfQueueRequest.class); + request.setQueue(queue); + + try { + int pollCount = 0; + long startTime = System.currentTimeMillis(); + + while (true) { + KillApplicationsOfQueueResponse response = null; + rmClient.killApplicationsOfQueue(request); + if (response.getIsKillCompleted()) { + LOG.info("Killed applications successfully."); + break; + } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() + && elapsedMillis >= this.asyncApiPollTimeoutMillis) { + throw new YarnException( + "Timed out while waiting for remaining applications to be killed."); + } + + if (++pollCount % 10 == 0) { + LOG.info("Waiting for remaining applications to be killed."); + } + Thread.sleep(asyncApiPollIntervalMillis); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for remaining applications to be killed."); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index b486074..f39a89a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -79,6 +79,8 @@ public static final String CONTAINER = "container"; public static final String APP_ID = "appId"; public static final String UPDATE_PRIORITY = "updatePriority"; + public static final String APP_KILL_BY_APPSTATES_CMD = "killByAppStates"; + public static final String APP_KILL_BY_QUEUE_CMD = "killOfQueue"; private boolean allAppStates; @@ -109,6 +111,8 @@ public int run(String[] args) throws Exception { opts.addOption(QUEUE_CMD, true, "Works with the movetoqueue command to" + " specify which queue to move an application to."); opts.addOption(HELP_CMD, false, "Displays help for all commands."); + opts.addOption(APP_KILL_BY_QUEUE_CMD, true, + "Kill the applications of specific queue."); Option appTypeOpt = new Option(APP_TYPE_CMD, true, "Works with -list to " + "filter applications based on " + "input comma-separated list of application types."); @@ -127,12 +131,21 @@ public int run(String[] args) throws Exception { opts.addOption(UPDATE_PRIORITY, true, "update priority of an application. ApplicationId can be" + " passed using 'appId' option."); + Option killAppStateOpt = + new Option(APP_KILL_BY_APPSTATES_CMD, true, + "The states of application that will be killed" + + ", input comma-separated list of application states."); + killAppStateOpt.setValueSeparator(','); + killAppStateOpt.setArgs(Option.UNLIMITED_VALUES); + killAppStateOpt.setArgName("States"); + opts.addOption(killAppStateOpt); opts.getOption(KILL_CMD).setArgName("Application ID"); opts.getOption(MOVE_TO_QUEUE_CMD).setArgName("Application ID"); opts.getOption(QUEUE_CMD).setArgName("Queue Name"); opts.getOption(STATUS_CMD).setArgName("Application ID"); opts.getOption(APP_ID).setArgName("Application ID"); opts.getOption(UPDATE_PRIORITY).setArgName("Priority"); + opts.getOption(APP_KILL_BY_QUEUE_CMD).setArgName("Queue Name"); } else if (args.length > 0 && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) { title = APPLICATION_ATTEMPT; opts.addOption(STATUS_CMD, true, @@ -284,6 +297,46 @@ public int run(String[] args) throws Exception { command = SignalContainerCommand.valueOf(signalArgs[1]); } signalContainer(containerId, command); + } else if (cliParser.hasOption(APP_KILL_BY_APPSTATES_CMD)) { + if (args.length != 3) { + printUsage(title, opts); + return exitCode; + } + + EnumSet appStates = + EnumSet.noneOf(YarnApplicationState.class); + String[] states = cliParser.getOptionValues(APP_KILL_BY_APPSTATES_CMD); + if (states != null) { + for (String state : states) { + if (!state.trim().isEmpty()) { + try { + appStates.add(YarnApplicationState.valueOf(StringUtils + .toUpperCase(state).trim())); + } catch (IllegalArgumentException ex) { + sysout.println("The application state " + state + " is invalid."); + sysout.println(getAllValidApplicationStates()); + return exitCode; + } + } + } + } + + try { + killApplicationsByAppStates(appStates); + } catch (ApplicationNotFoundException e) { + return exitCode; + } + } else if (cliParser.hasOption(APP_KILL_BY_QUEUE_CMD)) { + if (args.length != 3) { + printUsage(title, opts); + return exitCode; + } + + try { + killApplicationsOfQueue(cliParser.getOptionValue(APP_KILL_BY_QUEUE_CMD)); + } catch (ApplicationNotFoundException e) { + return exitCode; + } } else { syserr.println("Invalid Command Usage : "); printUsage(title, opts); @@ -511,6 +564,49 @@ private void killApplication(String applicationId) throws YarnException, } /** + * Kill the applications by appStates + * + * @param appStates + * @throws IOException + */ + private void killApplicationsByAppStates( + EnumSet appStates) throws YarnException, + IOException { + if (appStates == null || appStates.isEmpty()) { + sysout.println("The appStates should not be null."); + return; + } + + if (appStates.contains(YarnApplicationState.FAILED) + || appStates.contains(YarnApplicationState.KILLED) + || appStates.contains(YarnApplicationState.FINISHED)) { + sysout + .println("The appState should not contain state failed, killed, finished"); + return; + } + + sysout.println("Killing applications of specific states."); + client.killApplicationsByAppStates(appStates); + } + + /** + * Kill applications of specific queue + * + * @param queue + * @throws IOException + */ + private void killApplicationsOfQueue(String queue) throws YarnException, + IOException { + if (queue == null || queue.isEmpty()) { + sysout.println("The queue name should not be null."); + return; + } + + sysout.println("Killing applications of queue " + queue); + client.killApplicationsOfQueue(queue); + } + + /** * Moves the application with the given ID to the given queue. */ private void moveApplicationAcrossQueues(String applicationId, String queue) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 9ee7001..e7a18bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -870,6 +870,33 @@ public void testKillApplication() throws Exception { Assert.fail("Unexpected exception: " + e); } } + + @Test + public void testKillApplicationsByAppStates() throws Exception { + ApplicationCLI cli = createAndGetAppCLI(); + EnumSet appStates = + EnumSet.noneOf(YarnApplicationState.class); + appStates.add(YarnApplicationState.RUNNING); + appStates.add(YarnApplicationState.SUBMITTED); + + int result = + cli.run(new String[] { "application", "-killByAppStates", + "RUNNING,SUBMITTED" }); + assertEquals(0, result); + verify(client).killApplicationsByAppStates(appStates); + verify(sysOut).println("Killing applications of specific states."); + } + + @Test + public void testKillApplicationsOfQueue() throws Exception { + String queue = "queue"; + ApplicationCLI cli = createAndGetAppCLI(); + + int result = cli.run(new String[] { "application", "-killOfQueue", queue }); + assertEquals(0, result); + verify(client).killApplicationsOfQueue(queue); + verify(sysOut).println("Killing applications of queue " + queue); + } @Test public void testMoveApplicationAcrossQueues() throws Exception { @@ -1517,6 +1544,10 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" application types."); pw.println(" -help Displays help for all commands."); pw.println(" -kill Kills the application."); + pw.println(" -killByAppStates The states of application that will be"); + pw.println(" killed, input comma-separated list of"); + pw.println(" application states."); + pw.println(" -killOfQueue Kill the applications of specific queue."); pw.println(" -list List applications. Supports optional use"); pw.println(" of -appTypes to filter applications based"); pw.println(" on application type, and -appStates to"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index c1e6e9a..5c6b3ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -67,6 +67,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; @@ -119,6 +123,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsByAppStatesRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsByAppStatesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsOfQueueRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsOfQueueResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; @@ -152,6 +160,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; @@ -564,4 +574,33 @@ public SignalContainerResponse signalContainer( return null; } } + + @Override + public KillApplicationsByAppStatesResponse killApplicationsByAppStates( + KillApplicationsByAppStatesRequest request) throws YarnException, + IOException { + KillApplicationsByAppStatesRequestProto requestProto = + ((KillApplicationsByAppStatesRequestPBImpl) request).getProto(); + try { + return new KillApplicationsByAppStatesResponsePBImpl( + proxy.killApplicationsByAppStates(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public KillApplicationsOfQueueResponse killApplicationsOfQueue( + KillApplicationsOfQueueRequest request) throws YarnException, IOException { + KillApplicationsOfQueueRequestProto requestProto = + ((KillApplicationsOfQueueRequestPBImpl) request).getProto(); + try { + return new KillApplicationsOfQueueResponsePBImpl( + proxy.killApplicationsOfQueue(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 2ee88c8..32d92db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; ++import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; ++import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; @@ -91,6 +93,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsByAppStatesRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsByAppStatesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsOfQueueRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationsOfQueueResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.MoveApplicationAcrossQueuesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; @@ -139,6 +145,10 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto; @@ -567,4 +577,38 @@ public SignalContainerResponseProto signalContainer(RpcController controller, throw new ServiceException(e); } } + + @Override + public KillApplicationsByAppStatesResponseProto killApplicationsByAppStates( + RpcController controller, KillApplicationsByAppStatesRequestProto proto) + throws ServiceException { + KillApplicationsByAppStatesRequestPBImpl request = + new KillApplicationsByAppStatesRequestPBImpl(proto); + try { + KillApplicationsByAppStatesResponse response = + real.killApplicationsByAppStates(request); + return ((KillApplicationsByAppStatesResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public KillApplicationsOfQueueResponseProto killApplicationsOfQueue( + RpcController controller, KillApplicationsOfQueueRequestProto proto) + throws ServiceException { + KillApplicationsOfQueueRequestPBImpl request = + new KillApplicationsOfQueueRequestPBImpl(proto); + try { + KillApplicationsOfQueueResponse response = + real.killApplicationsOfQueue(request); + return ((KillApplicationsOfQueueResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsByAppStatesRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsByAppStatesRequestPBImpl.java new file mode 100644 index 0000000..37fe309 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsByAppStatesRequestPBImpl.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.EnumSet; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesRequestProtoOrBuilder; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.protobuf.TextFormat; + +public class KillApplicationsByAppStatesRequestPBImpl extends + KillApplicationsByAppStatesRequest { + + EnumSet applicationStates = null; + + KillApplicationsByAppStatesRequestProto proto = + KillApplicationsByAppStatesRequestProto.getDefaultInstance(); + KillApplicationsByAppStatesRequestProto.Builder builder = null; + boolean viaProto = false; + + public KillApplicationsByAppStatesRequestPBImpl() { + builder = KillApplicationsByAppStatesRequestProto.newBuilder(); + } + + public KillApplicationsByAppStatesRequestPBImpl( + KillApplicationsByAppStatesRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public KillApplicationsByAppStatesRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (applicationStates != null && !applicationStates.isEmpty()) { + builder.clearApplicationStates(); + builder.addAllApplicationStates(Iterables.transform(applicationStates, + new Function() { + @Override + public YarnApplicationStateProto apply(YarnApplicationState input) { + return ProtoUtils.convertToProtoFormat(input); + } + })); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationsByAppStatesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void initApplicationStates() { + if (this.applicationStates != null) { + return; + } + KillApplicationsByAppStatesRequestProtoOrBuilder p = + viaProto ? proto : builder; + List appStatesList = + p.getApplicationStatesList(); + this.applicationStates = EnumSet.noneOf(YarnApplicationState.class); + + for (YarnApplicationStateProto c : appStatesList) { + this.applicationStates.add(ProtoUtils.convertFromProtoFormat(c)); + } + } + + @Override + public EnumSet getApplicationStates() { + initApplicationStates(); + return this.applicationStates; + } + + @Override + public void setApplicationStates( + EnumSet applicationStates) { + maybeInitBuilder(); + if (applicationStates == null) { + builder.clearApplicationStates(); + } + this.applicationStates = applicationStates; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsByAppStatesResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsByAppStatesResponsePBImpl.java new file mode 100644 index 0000000..089e548 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsByAppStatesResponsePBImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsByAppStatesResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class KillApplicationsByAppStatesResponsePBImpl extends + KillApplicationsByAppStatesResponse { + KillApplicationsByAppStatesResponseProto proto = + KillApplicationsByAppStatesResponseProto.getDefaultInstance(); + KillApplicationsByAppStatesResponseProto.Builder builder = null; + boolean viaProto = false; + + public KillApplicationsByAppStatesResponsePBImpl() { + builder = KillApplicationsByAppStatesResponseProto.newBuilder(); + } + + public KillApplicationsByAppStatesResponsePBImpl( + KillApplicationsByAppStatesResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public KillApplicationsByAppStatesResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationsByAppStatesResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsKillCompleted() { + KillApplicationsByAppStatesResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsKillCompleted(); + } + + @Override + public void setIsKillCompleted(boolean isKillCompleted) { + maybeInitBuilder(); + builder.setIsKillCompleted(isKillCompleted); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsOfQueueRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsOfQueueRequestPBImpl.java new file mode 100644 index 0000000..0b9137c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsOfQueueRequestPBImpl.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class KillApplicationsOfQueueRequestPBImpl extends + KillApplicationsOfQueueRequest { + KillApplicationsOfQueueRequestProto proto = + KillApplicationsOfQueueRequestProto.getDefaultInstance(); + KillApplicationsOfQueueRequestProto.Builder builder = null; + boolean viaProto = false; + + private String queue; + + public KillApplicationsOfQueueRequestPBImpl() { + builder = KillApplicationsOfQueueRequestProto.newBuilder(); + } + + public KillApplicationsOfQueueRequestPBImpl( + KillApplicationsOfQueueRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public KillApplicationsOfQueueRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getQueue() { + KillApplicationsOfQueueRequestProtoOrBuilder p = viaProto ? proto : builder; + + this.queue = p.getQueue(); + return this.queue; + } + + @Override + public void setQueue(String queue) { + maybeInitBuilder(); + this.queue = queue; + } + + private void mergeLocalToBuilder() { + if (queue != null) { + builder.setQueue(this.queue); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationsOfQueueRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsOfQueueResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsOfQueueResponsePBImpl.java new file mode 100644 index 0000000..364feca --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationsOfQueueResponsePBImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationsOfQueueResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class KillApplicationsOfQueueResponsePBImpl extends + KillApplicationsOfQueueResponse { + KillApplicationsOfQueueResponseProto proto = + KillApplicationsOfQueueResponseProto.getDefaultInstance(); + KillApplicationsOfQueueResponseProto.Builder builder = null; + boolean viaProto = false; + + public KillApplicationsOfQueueResponsePBImpl() { + builder = KillApplicationsOfQueueResponseProto.newBuilder(); + } + + public KillApplicationsOfQueueResponsePBImpl( + KillApplicationsOfQueueResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public KillApplicationsOfQueueResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationsOfQueueResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsKillCompleted() { + KillApplicationsOfQueueResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsKillCompleted(); + } + + @Override + public void setIsKillCompleted(boolean isKillCompleted) { + maybeInitBuilder(); + builder.setIsKillCompleted(isKillCompleted); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index cfab488..f92f037 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -86,6 +87,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; @@ -1430,6 +1435,129 @@ public SignalContainerResponse signalContainer( .newRecordInstance(SignalContainerResponse.class); } + @SuppressWarnings("unchecked") + @Override + public KillApplicationsByAppStatesResponse killApplicationsByAppStates( + KillApplicationsByAppStatesRequest request) throws YarnException, + IOException { + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST, + "UNKNOWN", "ClientRMService", "Error getting UGI"); + throw RPCUtil.getRemoteException(ie); + } + + EnumSet appStates = request.getApplicationStates(); + List reports = + getApplicationReportListByAppStates(callerUGI, appStates); + List applicationIds = new ArrayList(); + + if (reports != null && reports.size() > 0) { + for (ApplicationReport report : reports) { + applicationIds.add(report.getApplicationId()); + } + } + + for (ApplicationId appId : applicationIds) { + RMApp application = this.rmContext.getRMApps().get(appId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService", + "Trying to kill an absent application", appId); + throw new ApplicationNotFoundException("Trying to kill an absent" + + " application " + appId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), + "ClientRMService", AuditConstants.UNAUTHORIZED_USER, appId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + appId)); + } + + if (application.isAppFinalStateStored()) { + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, "ClientRMService", appId); + continue; + } + + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL)); + } + + return KillApplicationsByAppStatesResponse.newInstance(true); + } + + @SuppressWarnings("unchecked") + @Override + public KillApplicationsOfQueueResponse killApplicationsOfQueue( + KillApplicationsOfQueueRequest request) throws YarnException, IOException { + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST, + "UNKNOWN", "ClientRMService", "Error getting UGI"); + throw RPCUtil.getRemoteException(ie); + } + + String queue = request.getQueue(); + Set queues = new HashSet(); + queues.add(queue); + List reports = + getApplicationReportListOfQueues(callerUGI, queues); + List applicationIds = new ArrayList(); + + if (reports != null && reports.size() > 0) { + for (ApplicationReport report : reports) { + applicationIds.add(report.getApplicationId()); + } + } + + for (ApplicationId appId : applicationIds) { + RMApp application = this.rmContext.getRMApps().get(appId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService", + "Trying to kill an absent application", appId); + throw new ApplicationNotFoundException("Trying to kill an absent" + + " application " + appId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), + "ClientRMService", AuditConstants.UNAUTHORIZED_USER, appId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + appId)); + } + + if (application.isAppFinalStateStored()) { + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, "ClientRMService", appId); + continue; + } + + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL)); + } + + return KillApplicationsOfQueueResponse.newInstance(true); + } + public List getApplicationReportListByAppStates( UserGroupInformation callerUGI, EnumSet applicationStates) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index a7219fa..83de992 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -72,6 +72,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsByAppStatesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationsOfQueueResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; @@ -543,6 +547,66 @@ public void testForceKillApplication() throws Exception { assertEquals("Incorrect number of apps in the RM", 2, rmService.getApplications(getRequest).getApplicationList().size()); } + + @Test + public void testKillApplicationsByAppStates() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(); + rm.init(conf); + rm.start(); + + ClientRMService rmService = rm.getClientRMService(); + GetApplicationsRequest getRequest = + GetApplicationsRequest.newInstance(EnumSet + .of(YarnApplicationState.KILLED)); + + EnumSet appStates = + EnumSet.noneOf(YarnApplicationState.class); + appStates.add(YarnApplicationState.ACCEPTED); + RMApp app1 = rm.submitApp(1024); + RMApp app2 = rm.submitApp(1024); + + assertEquals("Incorrect number of apps in the RM", 0, rmService + .getApplications(getRequest).getApplicationList().size()); + + KillApplicationsByAppStatesRequest killRequest = + KillApplicationsByAppStatesRequest.newInstance(appStates); + + KillApplicationsByAppStatesResponse killResponse = + rmService.killApplicationsByAppStates(killRequest); + assertTrue("Kill the applications successfully", + killResponse.getIsKillCompleted()); + } + + @Test + public void testKillApplicationsOfQueue() throws Exception { + String queue = "testqueue"; + YarnScheduler yarnScheduler = mock(YarnScheduler.class); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + + ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); + QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); + when( + mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), anyString())).thenReturn(true); + when( + mockAclsManager.checkAccess(any(UserGroupInformation.class), + any(ApplicationAccessType.class), anyString(), + any(ApplicationId.class))).thenReturn(true); + + ClientRMService rmService = + new ClientRMService(rmContext, yarnScheduler, null, mockAclsManager, + mockQueueACLsManager, null); + + KillApplicationsOfQueueRequest killRequest = + KillApplicationsOfQueueRequest.newInstance(queue); + + KillApplicationsOfQueueResponse killResponse = + rmService.killApplicationsOfQueue(killRequest); + assertTrue("Kill the applications successfully", + killResponse.getIsKillCompleted()); + } @Test (expected = ApplicationNotFoundException.class) public void testMoveAbsentApplication() throws YarnException {