diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 8523342..bc660f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; @@ -144,4 +146,9 @@ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( RefreshClusterMaxPriorityRequest request) throws YarnException, IOException; + + @Private + @Idempotent + public KillApplicationsForUserResponse killApplicationsForUser( + KillApplicationsForUserRequest request) throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserRequest.java new file mode 100644 index 0000000..a9cf04a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserRequest.java @@ -0,0 +1,44 @@ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The interface used by admin to the abort applications of user to + * ResourceManager. + *

+ * + *

+ * The admin, via {@link org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest} provides the user of + * the applications to be aborted. + *

+ * + * @see org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol#killApplicationsForUser(org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest) + */ +@Public +@Unstable +public abstract class KillApplicationsForUserRequest { + public static KillApplicationsForUserRequest newInstance(String user) { + KillApplicationsForUserRequest request = + Records.newRecord(KillApplicationsForUserRequest.class); + request.setUser(user); + return request; + } + + /** + * Get the user name to filter applications on + * + * @return the name of the user + */ + public abstract String getUser(); + + /** + * Set the queue name to filter applications on + * + * @param the name of the user + */ + public abstract void setUser(String user); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserResponse.java new file mode 100644 index 0000000..b875539 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserResponse.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * The response sent by the ResourceManager to the admin client + * aborting applications by user. + *

+ * The response, includes: + *

+ * Note: user is recommended to wait until this flag becomes true, otherwise if + * the ResourceManager crashes before the process of killing the + * application is completed, the ResourceManager may retry this + * application on recovery. + * + * @see org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol#killApplicationsForUser(KillApplicationsForUserRequest) + */ +@Public +@Stable +public abstract class KillApplicationsForUserResponse { + @Private + @Unstable + public static KillApplicationsForUserResponse newInstance(boolean isKillCompleted) { + KillApplicationsForUserResponse response = + Records.newRecord(KillApplicationsForUserResponse.class); + response.setIsKillCompleted(isKillCompleted); + return response; + } + + /** + * Get the flag which indicates that the process of killing application is completed or not. + */ + @Public + @Stable + public abstract boolean getIsKillCompleted(); + + /** + * Set the flag which indicates that the process of killing application is completed or not. + */ + @Private + @Unstable + public abstract void setIsKillCompleted(boolean isKillCompleted); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 1134623..5bc14cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -45,4 +45,5 @@ service ResourceManagerAdministrationProtocolService { rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto); rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto); rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto); + rpc killApplicationsForUser(KillApplicationsForUserRequestProto) returns (KillApplicationsForUserResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index eaf658f..c557c81 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -118,6 +118,14 @@ message RefreshClusterMaxPriorityRequestProto { message RefreshClusterMaxPriorityResponseProto { } +message KillApplicationsForUserRequestProto { + required string user = 1; +} + +message KillApplicationsForUserResponseProto { + optional bool is_kill_completed = 1 [default = false]; +} + message NodeIdToLabelsNameProto { optional NodeIdProto nodeId = 1; repeated string nodeLabels = 2; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index a5e53e4..958a2bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -95,9 +96,9 @@ protected final static Map ADMIN_USAGE = ImmutableMap.builder() .put("-refreshQueues", new UsageInfo("", - "Reload the queues' acls, states and scheduler specific " + - "properties. \n\t\tResourceManager will reload the " + - "mapred-queues configuration file.")) + "Reload the queues' acls, states and scheduler specific " + + "properties. \n\t\tResourceManager will reload the " + + "mapred-queues configuration file.")) .put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]", "Refresh the hosts information at the ResourceManager. Here " + "[-g [timeout in seconds] is optional, if we specify the " @@ -146,6 +147,10 @@ .put("-updateNodeResource", new UsageInfo("[NodeID] [MemSize] [vCores] ([OvercommitTimeout])", "Update resource on specific node.")) + .put( + "-killApplicationsForUser", + new UsageInfo("[username]", + "Kill the applications of specific user.")) .build(); public RMAdminCLI() { @@ -242,7 +247,8 @@ private static void printHelp(String cmd, boolean isHAEnabled) { " [-removeFromClusterNodeLabels ]" + " [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" + " [-directlyAccessNodeLabelStore]]" + - " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])"); + " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])" + + " [-killApplicationsForUser [username]]"); if (isHAEnabled) { appendHAUsage(summary); } @@ -648,6 +654,15 @@ private int replaceLabelsOnNodes(Map> map) } return 0; } + + private int killApplicationsForUser(String username) throws IOException, + YarnException { + ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); + KillApplicationsForUserRequest request = + KillApplicationsForUserRequest.newInstance(username); + adminProtocol.killApplicationsForUser(request); + return 0; + } @Override public int run(String[] args) throws Exception { @@ -787,6 +802,14 @@ public int run(String[] args) throws Exception { } else { exitCode = replaceLabelsOnNodes(args[i]); } + } else if ("-killApplicationsForUser".equals(cmd)) { + if (args.length != 2) { + System.err + .println("Usage: yarn rmadmin [-killApplicationsForUser [username]]"); + exitCode = -1; + } else { + exitCode = killApplicationsForUser(args[i]); + } } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index f01441d..edad5d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -366,7 +367,15 @@ public void testCheckHealth() throws Exception { verify(haadmin).monitorHealth(); } - /** + @Test(timeout = 500) + public void testKillApplicationsForUser() throws Exception { + String[] args = { "-killApplicationsForUser", "user" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).killApplicationsForUser( + any(KillApplicationsForUserRequest.class)); + } + + /** * Test printing of help messages */ @Test(timeout=500) @@ -399,6 +408,7 @@ public void testHelp() throws Exception { "<\"node1[:port]=label1,label2 node2[:port]=label1\">] " + "[-directlyAccessNodeLabelStore]] [-updateNodeResource " + "[NodeID] [MemSize] [vCores] ([OvercommitTimeout]) " + + "[-killApplicationsForUser [username]] " + "[-help [cmd]]")); assertTrue(dataOut .toString() @@ -432,6 +442,9 @@ public void testHelp() throws Exception { .contains( "-refreshServiceAcl: Reload the service-level authorization" + " policy file")); + assertTrue(dataOut.toString().contains( + "-killApplicationsForUser [username]: Kill the applications of" + + " specific user.")); assertTrue(dataOut .toString() .contains( @@ -465,6 +478,9 @@ public void testHelp() throws Exception { "Usage: yarn rmadmin [-getServiceState ]", dataErr, 0); testError(new String[] { "-help", "-checkHealth" }, "Usage: yarn rmadmin [-checkHealth ]", dataErr, 0); + testError(new String[] { "-help", "-killApplicationsForUser" }, + "Usage: yarn rmadmin [-killApplicationsForUser [username]]", dataErr, + 0); testError(new String[] { "-help", "-failover" }, "Usage: yarn rmadmin " + "[-failover [--forcefence] [--forceactive] " + @@ -489,6 +505,7 @@ public void testHelp() throws Exception { + " [-removeFromClusterNodeLabels ] [-replaceLabelsOnNode " + "<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore]] " + "[-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout]) " + + "[-killApplicationsForUser [username]] " + "[-transitionToActive [--forceactive] ] " + "[-transitionToStandby ] " + "[-getServiceState ] [-checkHealth ] [-help [cmd]]"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index 077edf3..d6e1416 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; @@ -50,6 +51,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; @@ -76,6 +79,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl; @@ -323,4 +328,18 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( return null; } } + + @Override + public KillApplicationsForUserResponse killApplicationsForUser( + KillApplicationsForUserRequest request) throws YarnException, IOException { + KillApplicationsForUserRequestProto requestProto = + ((KillApplicationsForUserRequestPBImpl) request).getProto(); + try { + return new KillApplicationsForUserResponsePBImpl( + proxy.killApplicationsForUser(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index aafce08..b8a8935 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto; @@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; @@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl; @@ -336,4 +341,21 @@ public RefreshClusterMaxPriorityResponseProto refreshClusterMaxPriority( throw new ServiceException(e); } } + + @Override + public KillApplicationsForUserResponseProto killApplicationsForUser( + RpcController controller, KillApplicationsForUserRequestProto proto) + throws ServiceException { + KillApplicationsForUserRequestPBImpl request = + new KillApplicationsForUserRequestPBImpl(proto); + try { + KillApplicationsForUserResponse response = + real.killApplicationsForUser(request); + return ((KillApplicationsForUserResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserRequestPBImpl.java new file mode 100644 index 0000000..63b094f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserRequestPBImpl.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class KillApplicationsForUserRequestPBImpl extends + KillApplicationsForUserRequest { + KillApplicationsForUserRequestProto proto = + KillApplicationsForUserRequestProto.getDefaultInstance(); + KillApplicationsForUserRequestProto.Builder builder = null; + boolean viaProto = false; + + private String user; + + public KillApplicationsForUserRequestPBImpl() { + builder = KillApplicationsForUserRequestProto.newBuilder(); + } + + public KillApplicationsForUserRequestPBImpl( + KillApplicationsForUserRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public KillApplicationsForUserRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getUser() { + KillApplicationsForUserRequestProtoOrBuilder p = viaProto ? proto : builder; + + this.user = p.getUser(); + return this.user; + } + + @Override + public void setUser(String user) { + maybeInitBuilder(); + this.user = user; + } + + private void mergeLocalToBuilder() { + if (user != null) { + builder.setUser(this.user); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationsForUserRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserResponsePBImpl.java new file mode 100644 index 0000000..44da4a5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserResponsePBImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class KillApplicationsForUserResponsePBImpl extends + KillApplicationsForUserResponse { + KillApplicationsForUserResponseProto proto = + KillApplicationsForUserResponseProto.getDefaultInstance(); + KillApplicationsForUserResponseProto.Builder builder = null; + boolean viaProto = false; + + public KillApplicationsForUserResponsePBImpl() { + builder = KillApplicationsForUserResponseProto.newBuilder(); + } + + public KillApplicationsForUserResponsePBImpl( + KillApplicationsForUserResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public KillApplicationsForUserResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationsForUserResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsKillCompleted() { + KillApplicationsForUserResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsKillCompleted(); + } + + @Override + public void setIsKillCompleted(boolean isKillCompleted) { + maybeInitBuilder(); + builder.setIsKillCompleted(isKillCompleted); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 353e72d..8faf8b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -47,11 +50,14 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.DecommissionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -64,6 +70,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; @@ -87,7 +95,11 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -831,6 +843,52 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( } } + @SuppressWarnings("unchecked") + @Override + public KillApplicationsForUserResponse killApplicationsForUser( + KillApplicationsForUserRequest request) throws YarnException, IOException { + String argName = "killApplicationsForUser"; + UserGroupInformation user = checkAcls(argName); + + checkRMStatus(user.getShortUserName(), argName, + "kill applications for user."); + + Set users = new HashSet(); + users.add(request.getUser()); + List reports = + this.rmContext.getClientRMService().getApplicationReportListOfUsers(user, + users); + List applicationIds = new ArrayList(); + + if (reports != null && reports.size() > 0) { + for (ApplicationReport report : reports) { + applicationIds.add(report.getApplicationId()); + } + } + + for (ApplicationId appId : applicationIds) { + RMApp application = this.rmContext.getRMApps().get(appId); + if (application == null) { + RMAuditLogger.logFailure(user.getUserName(), + AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "AdminService", + "Trying to kill an absent application", appId); + throw new ApplicationNotFoundException("Trying to kill an absent" + + " application " + appId); + } + + if (application.isAppFinalStateStored()) { + RMAuditLogger.logSuccess(user.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, "AdminService", appId); + continue; + } + + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL)); + } + + return KillApplicationsForUserResponse.newInstance(true); + } + public String getHAZookeeperConnectionState() { if (!rmContext.isHAEnabled()) { return "ResourceManager HA is not enabled."; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 701cd74..cfab488 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -798,114 +798,9 @@ public GetApplicationsResponse getApplications( LongRange finish = request.getFinishRange(); ApplicationsRequestScope scope = request.getScope(); - final Map apps = rmContext.getRMApps(); - Iterator appsIter; - // If the query filters by queues, we can avoid considering apps outside - // of those queues by asking the scheduler for the apps in those queues. - if (queues != null && !queues.isEmpty()) { - // Construct an iterator over apps in given queues - // Collect list of lists to avoid copying all apps - final List> queueAppLists = - new ArrayList>(); - for (String queue : queues) { - List appsInQueue = scheduler.getAppsInQueue(queue); - if (appsInQueue != null && !appsInQueue.isEmpty()) { - queueAppLists.add(appsInQueue); - } - } - appsIter = new Iterator() { - Iterator> appListIter = queueAppLists.iterator(); - Iterator schedAppsIter; - - @Override - public boolean hasNext() { - // Because queueAppLists has no empty lists, hasNext is whether the - // current list hasNext or whether there are any remaining lists - return (schedAppsIter != null && schedAppsIter.hasNext()) - || appListIter.hasNext(); - } - @Override - public RMApp next() { - if (schedAppsIter == null || !schedAppsIter.hasNext()) { - schedAppsIter = appListIter.next().iterator(); - } - return apps.get(schedAppsIter.next().getApplicationId()); - } - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not supported"); - } - }; - } else { - appsIter = apps.values().iterator(); - } - - List reports = new ArrayList(); - while (appsIter.hasNext() && reports.size() < limit) { - RMApp application = appsIter.next(); - - // Check if current application falls under the specified scope - if (scope == ApplicationsRequestScope.OWN && - !callerUGI.getUserName().equals(application.getUser())) { - continue; - } - - if (applicationTypes != null && !applicationTypes.isEmpty()) { - String appTypeToMatch = caseSensitive - ? application.getApplicationType() - : StringUtils.toLowerCase(application.getApplicationType()); - if (!applicationTypes.contains(appTypeToMatch)) { - continue; - } - } - - if (applicationStates != null && !applicationStates.isEmpty()) { - if (!applicationStates.contains(application - .createApplicationState())) { - continue; - } - } - - if (users != null && !users.isEmpty() && - !users.contains(application.getUser())) { - continue; - } - - if (start != null && !start.containsLong(application.getStartTime())) { - continue; - } - - if (finish != null && !finish.containsLong(application.getFinishTime())) { - continue; - } - - if (tags != null && !tags.isEmpty()) { - Set appTags = application.getApplicationTags(); - if (appTags == null || appTags.isEmpty()) { - continue; - } - boolean match = false; - for (String tag : tags) { - if (appTags.contains(tag)) { - match = true; - break; - } - } - if (!match) { - continue; - } - } - - // checkAccess can grab the scheduler lock so call it last - boolean allowAccess = checkAccess(callerUGI, application.getUser(), - ApplicationAccessType.VIEW_APP, application); - if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) { - continue; - } - - reports.add(application.createAndGetApplicationReport( - callerUGI.getUserName(), allowAccess)); - } + List reports = + getApplicationReportList(callerUGI, caseSensitive, applicationTypes, + applicationStates, users, queues, tags, limit, start, finish, scope); GetApplicationsResponse response = recordFactory.newRecordInstance(GetApplicationsResponse.class); @@ -1535,4 +1430,143 @@ public SignalContainerResponse signalContainer( .newRecordInstance(SignalContainerResponse.class); } + public List getApplicationReportListByAppStates( + UserGroupInformation callerUGI, + EnumSet applicationStates) throws YarnException { + return getApplicationReportList(callerUGI, true, null, applicationStates, + null, null, null, Long.MAX_VALUE, null, null, null); + } + + public List getApplicationReportListOfUsers( + UserGroupInformation callerUGI, Set users) throws YarnException { + return getApplicationReportList(callerUGI, true, null, null, users, null, + null, Long.MAX_VALUE, null, null, null); + } + + public List getApplicationReportListOfQueues( + UserGroupInformation callerUGI, Set queues) throws YarnException { + return getApplicationReportList(callerUGI, true, null, null, null, queues, + null, Long.MAX_VALUE, null, null, null); + } + + private List getApplicationReportList( + UserGroupInformation callerUGI, boolean caseSensitive, + Set applicationTypes, + EnumSet applicationStates, Set users, + Set queues, Set tags, long limit, LongRange start, + LongRange finish, ApplicationsRequestScope scope) throws YarnException { + List reports = new ArrayList(); + final Map apps = rmContext.getRMApps(); + Iterator appsIter; + + // If the query filters by queues, we can avoid considering apps outside + // of those queues by asking the scheduler for the apps in those queues. + if (queues != null && !queues.isEmpty()) { + // Construct an iterator over apps in given queues + // Collect list of lists to avoid copying all apps + final List> queueAppLists = + new ArrayList>(); + for (String queue : queues) { + List appsInQueue = + scheduler.getAppsInQueue(queue); + if (appsInQueue != null && !appsInQueue.isEmpty()) { + queueAppLists.add(appsInQueue); + } + } + appsIter = new Iterator() { + Iterator> appListIter = queueAppLists + .iterator(); + Iterator schedAppsIter; + + @Override + public boolean hasNext() { + // Because queueAppLists has no empty lists, hasNext is whether the + // current list hasNext or whether there are any remaining lists + return (schedAppsIter != null && schedAppsIter.hasNext()) + || appListIter.hasNext(); + } + + @Override + public RMApp next() { + if (schedAppsIter == null || !schedAppsIter.hasNext()) { + schedAppsIter = appListIter.next().iterator(); + } + return apps.get(schedAppsIter.next().getApplicationId()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + }; + } else { + appsIter = apps.values().iterator(); + } + + while (appsIter.hasNext() && reports.size() < limit) { + RMApp application = appsIter.next(); + + // Check if current application falls under the specified scope + if (scope == ApplicationsRequestScope.OWN + && !callerUGI.getUserName().equals(application.getUser())) { + continue; + } + + if (applicationTypes != null && !applicationTypes.isEmpty()) { + String appTypeToMatch = + caseSensitive ? application.getApplicationType() : StringUtils + .toLowerCase(application.getApplicationType()); + if (!applicationTypes.contains(appTypeToMatch)) { + continue; + } + } + + if (applicationStates != null && !applicationStates.isEmpty()) { + if (!applicationStates.contains(application.createApplicationState())) { + continue; + } + } + + if (users != null && !users.isEmpty() + && !users.contains(application.getUser())) { + continue; + } + + if (start != null && !start.containsLong(application.getStartTime())) { + continue; + } + + if (finish != null && !finish.containsLong(application.getFinishTime())) { + continue; + } + + if (tags != null && !tags.isEmpty()) { + Set appTags = application.getApplicationTags(); + if (appTags == null || appTags.isEmpty()) { + continue; + } + boolean match = false; + for (String tag : tags) { + if (appTags.contains(tag)) { + match = true; + break; + } + } + if (!match) { + continue; + } + } + + // checkAccess can grab the scheduler lock so call it last + boolean allowAccess = + checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.VIEW_APP, application); + if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) { + continue; + } + + reports.add(application.createAndGetApplicationReport( + callerUGI.getUserName(), allowAccess)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index e61d9fe..19cdf30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.DataOutputStream; @@ -27,6 +28,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -46,11 +48,15 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.records.DecommissionType; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -62,6 +68,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -947,6 +955,28 @@ public void testAdminRefreshClusterMaxPriority() throws Exception, } } + @Test + public void testKillApplicationsForUser() throws Exception { + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + String user = "user"; + RMApp app1 = rm.submitApp(1024, "app1", user); + RMApp app2 = rm.submitApp(1024, "app2", "user2"); + + KillApplicationsForUserResponse response = + rm.adminService.killApplicationsForUser(KillApplicationsForUserRequest + .newInstance(user)); + + assertTrue("Kill the applications successfully", + response.getIsKillCompleted()); + } + private String writeConfigurationXML(Configuration conf, String confXMLName) throws IOException { DataOutputStream output = null; @@ -958,8 +988,7 @@ private String writeConfigurationXML(Configuration conf, String confXMLName) if (!confFile.createNewFile()) { Assert.fail("Can not create " + confXMLName); } - output = new DataOutputStream( - new FileOutputStream(confFile)); + output = new DataOutputStream(new FileOutputStream(confFile)); conf.writeXml(output); return confFile.getAbsolutePath(); } finally {