diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
index 8523342..bc660f3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
@@ -32,6 +32,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
@@ -144,4 +146,9 @@ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request) throws YarnException,
IOException;
+
+ @Private
+ @Idempotent
+ public KillApplicationsForUserResponse killApplicationsForUser(
+ KillApplicationsForUserRequest request) throws YarnException, IOException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserRequest.java
new file mode 100644
index 0000000..a9cf04a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserRequest.java
@@ -0,0 +1,44 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ * The interface used by admin to the abort applications of user to
+ * ResourceManager.
+ *
+ *
+ *
+ * The admin, via {@link org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest} provides the user of
+ * the applications to be aborted.
+ *
+ *
+ * @see org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol#killApplicationsForUser(org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest)
+ */
+@Public
+@Unstable
+public abstract class KillApplicationsForUserRequest {
+ public static KillApplicationsForUserRequest newInstance(String user) {
+ KillApplicationsForUserRequest request =
+ Records.newRecord(KillApplicationsForUserRequest.class);
+ request.setUser(user);
+ return request;
+ }
+
+ /**
+ * Get the user name to filter applications on
+ *
+ * @return the name of the user
+ */
+ public abstract String getUser();
+
+ /**
+ * Set the queue name to filter applications on
+ *
+ * @param the name of the user
+ */
+ public abstract void setUser(String user);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserResponse.java
new file mode 100644
index 0000000..b875539
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/KillApplicationsForUserResponse.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The response sent by the ResourceManager to the admin client
+ * aborting applications by user.
+ *
+ * The response, includes:
+ *
+ * -
+ * A flag which indicates that the process of killing the application is
+ * completed or not.
+ *
+ * Note: user is recommended to wait until this flag becomes true, otherwise if
+ * the ResourceManager crashes before the process of killing the
+ * application is completed, the ResourceManager may retry this
+ * application on recovery.
+ *
+ * @see org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol#killApplicationsForUser(KillApplicationsForUserRequest)
+ */
+@Public
+@Stable
+public abstract class KillApplicationsForUserResponse {
+ @Private
+ @Unstable
+ public static KillApplicationsForUserResponse newInstance(boolean isKillCompleted) {
+ KillApplicationsForUserResponse response =
+ Records.newRecord(KillApplicationsForUserResponse.class);
+ response.setIsKillCompleted(isKillCompleted);
+ return response;
+ }
+
+ /**
+ * Get the flag which indicates that the process of killing application is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsKillCompleted();
+
+ /**
+ * Set the flag which indicates that the process of killing application is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsKillCompleted(boolean isKillCompleted);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
index 1134623..5bc14cf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
@@ -45,4 +45,5 @@ service ResourceManagerAdministrationProtocolService {
rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
+ rpc killApplicationsForUser(KillApplicationsForUserRequestProto) returns (KillApplicationsForUserResponseProto);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index eaf658f..c557c81 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -118,6 +118,14 @@ message RefreshClusterMaxPriorityRequestProto {
message RefreshClusterMaxPriorityResponseProto {
}
+message KillApplicationsForUserRequestProto {
+ required string user = 1;
+}
+
+message KillApplicationsForUserResponseProto {
+ optional bool is_kill_completed = 1 [default = false];
+}
+
message NodeIdToLabelsNameProto {
optional NodeIdProto nodeId = 1;
repeated string nodeLabels = 2;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index a5e53e4..958a2bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -95,9 +96,9 @@
protected final static Map ADMIN_USAGE =
ImmutableMap.builder()
.put("-refreshQueues", new UsageInfo("",
- "Reload the queues' acls, states and scheduler specific " +
- "properties. \n\t\tResourceManager will reload the " +
- "mapred-queues configuration file."))
+ "Reload the queues' acls, states and scheduler specific " +
+ "properties. \n\t\tResourceManager will reload the " +
+ "mapred-queues configuration file."))
.put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]",
"Refresh the hosts information at the ResourceManager. Here "
+ "[-g [timeout in seconds] is optional, if we specify the "
@@ -146,6 +147,10 @@
.put("-updateNodeResource",
new UsageInfo("[NodeID] [MemSize] [vCores] ([OvercommitTimeout])",
"Update resource on specific node."))
+ .put(
+ "-killApplicationsForUser",
+ new UsageInfo("[username]",
+ "Kill the applications of specific user."))
.build();
public RMAdminCLI() {
@@ -242,7 +247,8 @@ private static void printHelp(String cmd, boolean isHAEnabled) {
" [-removeFromClusterNodeLabels ]" +
" [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" +
" [-directlyAccessNodeLabelStore]]" +
- " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])");
+ " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])" +
+ " [-killApplicationsForUser [username]]");
if (isHAEnabled) {
appendHAUsage(summary);
}
@@ -648,6 +654,15 @@ private int replaceLabelsOnNodes(Map> map)
}
return 0;
}
+
+ private int killApplicationsForUser(String username) throws IOException,
+ YarnException {
+ ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
+ KillApplicationsForUserRequest request =
+ KillApplicationsForUserRequest.newInstance(username);
+ adminProtocol.killApplicationsForUser(request);
+ return 0;
+ }
@Override
public int run(String[] args) throws Exception {
@@ -787,6 +802,14 @@ public int run(String[] args) throws Exception {
} else {
exitCode = replaceLabelsOnNodes(args[i]);
}
+ } else if ("-killApplicationsForUser".equals(cmd)) {
+ if (args.length != 2) {
+ System.err
+ .println("Usage: yarn rmadmin [-killApplicationsForUser [username]]");
+ exitCode = -1;
+ } else {
+ exitCode = killApplicationsForUser(args[i]);
+ }
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index f01441d..edad5d5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -366,7 +367,15 @@ public void testCheckHealth() throws Exception {
verify(haadmin).monitorHealth();
}
- /**
+ @Test(timeout = 500)
+ public void testKillApplicationsForUser() throws Exception {
+ String[] args = { "-killApplicationsForUser", "user" };
+ assertEquals(0, rmAdminCLI.run(args));
+ verify(admin).killApplicationsForUser(
+ any(KillApplicationsForUserRequest.class));
+ }
+
+ /**
* Test printing of help messages
*/
@Test(timeout=500)
@@ -399,6 +408,7 @@ public void testHelp() throws Exception {
"<\"node1[:port]=label1,label2 node2[:port]=label1\">] " +
"[-directlyAccessNodeLabelStore]] [-updateNodeResource " +
"[NodeID] [MemSize] [vCores] ([OvercommitTimeout]) " +
+ "[-killApplicationsForUser [username]] " +
"[-help [cmd]]"));
assertTrue(dataOut
.toString()
@@ -432,6 +442,9 @@ public void testHelp() throws Exception {
.contains(
"-refreshServiceAcl: Reload the service-level authorization" +
" policy file"));
+ assertTrue(dataOut.toString().contains(
+ "-killApplicationsForUser [username]: Kill the applications of"
+ + " specific user."));
assertTrue(dataOut
.toString()
.contains(
@@ -465,6 +478,9 @@ public void testHelp() throws Exception {
"Usage: yarn rmadmin [-getServiceState ]", dataErr, 0);
testError(new String[] { "-help", "-checkHealth" },
"Usage: yarn rmadmin [-checkHealth ]", dataErr, 0);
+ testError(new String[] { "-help", "-killApplicationsForUser" },
+ "Usage: yarn rmadmin [-killApplicationsForUser [username]]", dataErr,
+ 0);
testError(new String[] { "-help", "-failover" },
"Usage: yarn rmadmin " +
"[-failover [--forcefence] [--forceactive] " +
@@ -489,6 +505,7 @@ public void testHelp() throws Exception {
+ " [-removeFromClusterNodeLabels ] [-replaceLabelsOnNode "
+ "<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore]] "
+ "[-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout]) "
+ + "[-killApplicationsForUser [username]] "
+ "[-transitionToActive [--forceactive] ] "
+ "[-transitionToStandby ] "
+ "[-getServiceState ] [-checkHealth ] [-help [cmd]]";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
index 077edf3..d6e1416 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
@@ -50,6 +51,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
@@ -76,6 +79,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl;
@@ -323,4 +328,18 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
return null;
}
}
+
+ @Override
+ public KillApplicationsForUserResponse killApplicationsForUser(
+ KillApplicationsForUserRequest request) throws YarnException, IOException {
+ KillApplicationsForUserRequestProto requestProto =
+ ((KillApplicationsForUserRequestPBImpl) request).getProto();
+ try {
+ return new KillApplicationsForUserResponsePBImpl(
+ proxy.killApplicationsForUser(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
index aafce08..b8a8935 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
@@ -28,6 +28,8 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
@@ -55,6 +57,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
@@ -71,6 +74,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.KillApplicationsForUserResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl;
@@ -336,4 +341,21 @@ public RefreshClusterMaxPriorityResponseProto refreshClusterMaxPriority(
throw new ServiceException(e);
}
}
+
+ @Override
+ public KillApplicationsForUserResponseProto killApplicationsForUser(
+ RpcController controller, KillApplicationsForUserRequestProto proto)
+ throws ServiceException {
+ KillApplicationsForUserRequestPBImpl request =
+ new KillApplicationsForUserRequestPBImpl(proto);
+ try {
+ KillApplicationsForUserResponse response =
+ real.killApplicationsForUser(request);
+ return ((KillApplicationsForUserResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserRequestPBImpl.java
new file mode 100644
index 0000000..63b094f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserRequestPBImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class KillApplicationsForUserRequestPBImpl extends
+ KillApplicationsForUserRequest {
+ KillApplicationsForUserRequestProto proto =
+ KillApplicationsForUserRequestProto.getDefaultInstance();
+ KillApplicationsForUserRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private String user;
+
+ public KillApplicationsForUserRequestPBImpl() {
+ builder = KillApplicationsForUserRequestProto.newBuilder();
+ }
+
+ public KillApplicationsForUserRequestPBImpl(
+ KillApplicationsForUserRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public KillApplicationsForUserRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public String getUser() {
+ KillApplicationsForUserRequestProtoOrBuilder p = viaProto ? proto : builder;
+
+ this.user = p.getUser();
+ return this.user;
+ }
+
+ @Override
+ public void setUser(String user) {
+ maybeInitBuilder();
+ this.user = user;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (user != null) {
+ builder.setUser(this.user);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = KillApplicationsForUserRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserResponsePBImpl.java
new file mode 100644
index 0000000..44da4a5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/KillApplicationsForUserResponsePBImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.KillApplicationsForUserResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class KillApplicationsForUserResponsePBImpl extends
+ KillApplicationsForUserResponse {
+ KillApplicationsForUserResponseProto proto =
+ KillApplicationsForUserResponseProto.getDefaultInstance();
+ KillApplicationsForUserResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public KillApplicationsForUserResponsePBImpl() {
+ builder = KillApplicationsForUserResponseProto.newBuilder();
+ }
+
+ public KillApplicationsForUserResponsePBImpl(
+ KillApplicationsForUserResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public KillApplicationsForUserResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = KillApplicationsForUserResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public boolean getIsKillCompleted() {
+ KillApplicationsForUserResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsKillCompleted();
+ }
+
+ @Override
+ public void setIsKillCompleted(boolean isKillCompleted) {
+ maybeInitBuilder();
+ builder.setIsKillCompleted(isKillCompleted);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 353e72d..8faf8b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -21,6 +21,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -47,11 +50,14 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -64,6 +70,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
@@ -87,7 +95,11 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
@@ -831,6 +843,52 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
}
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public KillApplicationsForUserResponse killApplicationsForUser(
+ KillApplicationsForUserRequest request) throws YarnException, IOException {
+ String argName = "killApplicationsForUser";
+ UserGroupInformation user = checkAcls(argName);
+
+ checkRMStatus(user.getShortUserName(), argName,
+ "kill applications for user.");
+
+ Set users = new HashSet();
+ users.add(request.getUser());
+ List reports =
+ this.rmContext.getClientRMService().getApplicationReportListOfUsers(user,
+ users);
+ List applicationIds = new ArrayList();
+
+ if (reports != null && reports.size() > 0) {
+ for (ApplicationReport report : reports) {
+ applicationIds.add(report.getApplicationId());
+ }
+ }
+
+ for (ApplicationId appId : applicationIds) {
+ RMApp application = this.rmContext.getRMApps().get(appId);
+ if (application == null) {
+ RMAuditLogger.logFailure(user.getUserName(),
+ AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "AdminService",
+ "Trying to kill an absent application", appId);
+ throw new ApplicationNotFoundException("Trying to kill an absent"
+ + " application " + appId);
+ }
+
+ if (application.isAppFinalStateStored()) {
+ RMAuditLogger.logSuccess(user.getShortUserName(),
+ AuditConstants.KILL_APP_REQUEST, "AdminService", appId);
+ continue;
+ }
+
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(appId, RMAppEventType.KILL));
+ }
+
+ return KillApplicationsForUserResponse.newInstance(true);
+ }
+
public String getHAZookeeperConnectionState() {
if (!rmContext.isHAEnabled()) {
return "ResourceManager HA is not enabled.";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 701cd74..cfab488 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -798,114 +798,9 @@ public GetApplicationsResponse getApplications(
LongRange finish = request.getFinishRange();
ApplicationsRequestScope scope = request.getScope();
- final Map apps = rmContext.getRMApps();
- Iterator appsIter;
- // If the query filters by queues, we can avoid considering apps outside
- // of those queues by asking the scheduler for the apps in those queues.
- if (queues != null && !queues.isEmpty()) {
- // Construct an iterator over apps in given queues
- // Collect list of lists to avoid copying all apps
- final List> queueAppLists =
- new ArrayList>();
- for (String queue : queues) {
- List appsInQueue = scheduler.getAppsInQueue(queue);
- if (appsInQueue != null && !appsInQueue.isEmpty()) {
- queueAppLists.add(appsInQueue);
- }
- }
- appsIter = new Iterator() {
- Iterator> appListIter = queueAppLists.iterator();
- Iterator schedAppsIter;
-
- @Override
- public boolean hasNext() {
- // Because queueAppLists has no empty lists, hasNext is whether the
- // current list hasNext or whether there are any remaining lists
- return (schedAppsIter != null && schedAppsIter.hasNext())
- || appListIter.hasNext();
- }
- @Override
- public RMApp next() {
- if (schedAppsIter == null || !schedAppsIter.hasNext()) {
- schedAppsIter = appListIter.next().iterator();
- }
- return apps.get(schedAppsIter.next().getApplicationId());
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Remove not supported");
- }
- };
- } else {
- appsIter = apps.values().iterator();
- }
-
- List reports = new ArrayList();
- while (appsIter.hasNext() && reports.size() < limit) {
- RMApp application = appsIter.next();
-
- // Check if current application falls under the specified scope
- if (scope == ApplicationsRequestScope.OWN &&
- !callerUGI.getUserName().equals(application.getUser())) {
- continue;
- }
-
- if (applicationTypes != null && !applicationTypes.isEmpty()) {
- String appTypeToMatch = caseSensitive
- ? application.getApplicationType()
- : StringUtils.toLowerCase(application.getApplicationType());
- if (!applicationTypes.contains(appTypeToMatch)) {
- continue;
- }
- }
-
- if (applicationStates != null && !applicationStates.isEmpty()) {
- if (!applicationStates.contains(application
- .createApplicationState())) {
- continue;
- }
- }
-
- if (users != null && !users.isEmpty() &&
- !users.contains(application.getUser())) {
- continue;
- }
-
- if (start != null && !start.containsLong(application.getStartTime())) {
- continue;
- }
-
- if (finish != null && !finish.containsLong(application.getFinishTime())) {
- continue;
- }
-
- if (tags != null && !tags.isEmpty()) {
- Set appTags = application.getApplicationTags();
- if (appTags == null || appTags.isEmpty()) {
- continue;
- }
- boolean match = false;
- for (String tag : tags) {
- if (appTags.contains(tag)) {
- match = true;
- break;
- }
- }
- if (!match) {
- continue;
- }
- }
-
- // checkAccess can grab the scheduler lock so call it last
- boolean allowAccess = checkAccess(callerUGI, application.getUser(),
- ApplicationAccessType.VIEW_APP, application);
- if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) {
- continue;
- }
-
- reports.add(application.createAndGetApplicationReport(
- callerUGI.getUserName(), allowAccess));
- }
+ List reports =
+ getApplicationReportList(callerUGI, caseSensitive, applicationTypes,
+ applicationStates, users, queues, tags, limit, start, finish, scope);
GetApplicationsResponse response =
recordFactory.newRecordInstance(GetApplicationsResponse.class);
@@ -1535,4 +1430,143 @@ public SignalContainerResponse signalContainer(
.newRecordInstance(SignalContainerResponse.class);
}
+ public List getApplicationReportListByAppStates(
+ UserGroupInformation callerUGI,
+ EnumSet applicationStates) throws YarnException {
+ return getApplicationReportList(callerUGI, true, null, applicationStates,
+ null, null, null, Long.MAX_VALUE, null, null, null);
+ }
+
+ public List getApplicationReportListOfUsers(
+ UserGroupInformation callerUGI, Set users) throws YarnException {
+ return getApplicationReportList(callerUGI, true, null, null, users, null,
+ null, Long.MAX_VALUE, null, null, null);
+ }
+
+ public List getApplicationReportListOfQueues(
+ UserGroupInformation callerUGI, Set queues) throws YarnException {
+ return getApplicationReportList(callerUGI, true, null, null, null, queues,
+ null, Long.MAX_VALUE, null, null, null);
+ }
+
+ private List getApplicationReportList(
+ UserGroupInformation callerUGI, boolean caseSensitive,
+ Set applicationTypes,
+ EnumSet applicationStates, Set users,
+ Set queues, Set tags, long limit, LongRange start,
+ LongRange finish, ApplicationsRequestScope scope) throws YarnException {
+ List reports = new ArrayList();
+ final Map apps = rmContext.getRMApps();
+ Iterator appsIter;
+
+ // If the query filters by queues, we can avoid considering apps outside
+ // of those queues by asking the scheduler for the apps in those queues.
+ if (queues != null && !queues.isEmpty()) {
+ // Construct an iterator over apps in given queues
+ // Collect list of lists to avoid copying all apps
+ final List> queueAppLists =
+ new ArrayList>();
+ for (String queue : queues) {
+ List appsInQueue =
+ scheduler.getAppsInQueue(queue);
+ if (appsInQueue != null && !appsInQueue.isEmpty()) {
+ queueAppLists.add(appsInQueue);
+ }
+ }
+ appsIter = new Iterator() {
+ Iterator> appListIter = queueAppLists
+ .iterator();
+ Iterator schedAppsIter;
+
+ @Override
+ public boolean hasNext() {
+ // Because queueAppLists has no empty lists, hasNext is whether the
+ // current list hasNext or whether there are any remaining lists
+ return (schedAppsIter != null && schedAppsIter.hasNext())
+ || appListIter.hasNext();
+ }
+
+ @Override
+ public RMApp next() {
+ if (schedAppsIter == null || !schedAppsIter.hasNext()) {
+ schedAppsIter = appListIter.next().iterator();
+ }
+ return apps.get(schedAppsIter.next().getApplicationId());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not supported");
+ }
+ };
+ } else {
+ appsIter = apps.values().iterator();
+ }
+
+ while (appsIter.hasNext() && reports.size() < limit) {
+ RMApp application = appsIter.next();
+
+ // Check if current application falls under the specified scope
+ if (scope == ApplicationsRequestScope.OWN
+ && !callerUGI.getUserName().equals(application.getUser())) {
+ continue;
+ }
+
+ if (applicationTypes != null && !applicationTypes.isEmpty()) {
+ String appTypeToMatch =
+ caseSensitive ? application.getApplicationType() : StringUtils
+ .toLowerCase(application.getApplicationType());
+ if (!applicationTypes.contains(appTypeToMatch)) {
+ continue;
+ }
+ }
+
+ if (applicationStates != null && !applicationStates.isEmpty()) {
+ if (!applicationStates.contains(application.createApplicationState())) {
+ continue;
+ }
+ }
+
+ if (users != null && !users.isEmpty()
+ && !users.contains(application.getUser())) {
+ continue;
+ }
+
+ if (start != null && !start.containsLong(application.getStartTime())) {
+ continue;
+ }
+
+ if (finish != null && !finish.containsLong(application.getFinishTime())) {
+ continue;
+ }
+
+ if (tags != null && !tags.isEmpty()) {
+ Set appTags = application.getApplicationTags();
+ if (appTags == null || appTags.isEmpty()) {
+ continue;
+ }
+ boolean match = false;
+ for (String tag : tags) {
+ if (appTags.contains(tag)) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ continue;
+ }
+ }
+
+ // checkAccess can grab the scheduler lock so call it last
+ boolean allowAccess =
+ checkAccess(callerUGI, application.getUser(),
+ ApplicationAccessType.VIEW_APP, application);
+ if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) {
+ continue;
+ }
+
+ reports.add(application.createAndGetApplicationReport(
+ callerUGI.getUserName(), allowAccess));
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index e61d9fe..19cdf30 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataOutputStream;
@@ -27,6 +28,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Set;
@@ -46,11 +48,15 @@
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.KillApplicationsForUserResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -62,6 +68,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -947,6 +955,28 @@ public void testAdminRefreshClusterMaxPriority() throws Exception,
}
}
+ @Test
+ public void testKillApplicationsForUser() throws Exception {
+ try {
+ rm = new MockRM(configuration);
+ rm.init(configuration);
+ rm.start();
+ } catch (Exception ex) {
+ fail("Should not get any exceptions");
+ }
+
+ String user = "user";
+ RMApp app1 = rm.submitApp(1024, "app1", user);
+ RMApp app2 = rm.submitApp(1024, "app2", "user2");
+
+ KillApplicationsForUserResponse response =
+ rm.adminService.killApplicationsForUser(KillApplicationsForUserRequest
+ .newInstance(user));
+
+ assertTrue("Kill the applications successfully",
+ response.getIsKillCompleted());
+ }
+
private String writeConfigurationXML(Configuration conf, String confXMLName)
throws IOException {
DataOutputStream output = null;
@@ -958,8 +988,7 @@ private String writeConfigurationXML(Configuration conf, String confXMLName)
if (!confFile.createNewFile()) {
Assert.fail("Can not create " + confXMLName);
}
- output = new DataOutputStream(
- new FileOutputStream(confFile));
+ output = new DataOutputStream(new FileOutputStream(confFile));
conf.writeXml(output);
return confFile.getAbsolutePath();
} finally {