diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1e55fe383b0..489b68fa81b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -543,6 +543,24 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_NODES_INCLUDE_FILE_PATH =
RM_PREFIX + "nodes.include-path";
public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
+
+ /** Enable submission pre-processor.*/
+ public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED =
+ RM_PREFIX + "submission-preprocessor.enabled";
+ public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED =
+ false;
+
+ /** Path to file with hosts for the submission processor to handle.*/
+ public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
+ RM_PREFIX + "submission-preprocessor.file-path";
+ public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
+ "";
+
+ /** Submission processor refresh interval.*/
+ public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS =
+ RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
+ public static final int DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS =
+ 0;
/** Path to file with nodes to exclude.*/
public static final String RM_NODES_EXCLUDE_FILE_PATH =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7a672dee6c6..2a0ed4d963d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4250,4 +4250,25 @@
yarn.nodemanager.containers-launcher.class
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher
+
+
+
+ Enable the Pre processing of Application Submission context with server side configuration
+
+ yarn.resourcemanager.submission-preprocessor.enabled
+ false
+
+
+
+ Path to file with hosts for the submission processor to handle.
+ yarn.resourcemanager.submission-preprocessor.file-path
+
+
+
+
+ Submission processor refresh interval
+ yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms
+ 60000
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 1e54cd6b8d2..55b7058ddf7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -231,6 +231,7 @@
private Clock clock;
private ReservationSystem reservationSystem;
private ReservationInputValidator rValidator;
+ private SubmissionContextPreProcessor contextPreProcessor;
private boolean filterAppsByUser = false;
@@ -313,6 +314,13 @@ protected void serviceStart() throws Exception {
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
+ if (conf.getBoolean(
+ YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
+ YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) {
+ this.contextPreProcessor = new SubmissionContextPreProcessor();
+ this.contextPreProcessor.start(conf);
+ }
+
super.serviceStart();
}
@@ -321,6 +329,9 @@ protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
+ if (this.contextPreProcessor != null) {
+ this.contextPreProcessor.stop();
+ }
super.serviceStop();
}
@@ -332,6 +343,11 @@ InetSocketAddress getBindAddress(Configuration conf) {
YarnConfiguration.DEFAULT_RM_PORT);
}
+ @VisibleForTesting
+ SubmissionContextPreProcessor getContextPreProcessor() {
+ return this.contextPreProcessor;
+ }
+
@Private
public InetSocketAddress getBindAddress() {
return clientBindAddress;
@@ -663,6 +679,11 @@ public SubmitApplicationResponse submitApplication(
checkReservationACLs(submissionContext.getQueue(), AuditConstants
.SUBMIT_RESERVATION_REQUEST, reservationId);
+ if (this.contextPreProcessor != null) {
+ this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(),
+ applicationId, submissionContext);
+ }
+
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
@@ -1096,8 +1117,7 @@ public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException {
try {
if (!isAllowedDelegationTokenOp()) {
- throw new IOException(
- "Delegation Token can be renewed only with kerberos authentication");
+ throw new IOException("Delegation Token can be renewed only with kerberos authentication");
}
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java
new file mode 100644
index 00000000000..f134328b652
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Pre process the ApplicationSubmissionContext with server side
+ * information
+ */
+public class SubmissionContextPreProcessor {
+
+ private static Logger LOG = LoggerFactory.getLogger(
+ SubmissionContextPreProcessor.class);
+ private static String DEFAULT_COMMANDS = "*";
+
+ interface ContextProcessor {
+ void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext);
+ }
+
+ static class NodeLabelProcessor implements ContextProcessor {
+ @Override
+ public void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ submissionContext.setNodeLabelExpression(value);
+ }
+ }
+
+ static class QueueProcessor implements ContextProcessor {
+ @Override
+ public void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ submissionContext.setQueue(value);
+ }
+ }
+
+ static class TagAddProcessor implements ContextProcessor {
+ @Override
+ public void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ Set applicationTags = submissionContext.getApplicationTags();
+ if (applicationTags == null) {
+ applicationTags = new HashSet<>();
+ } else {
+ applicationTags = new HashSet<>(applicationTags);
+ }
+ applicationTags.add(value);
+ submissionContext.setApplicationTags(applicationTags);
+ }
+ }
+
+ private static final int INITIAL_DELAY = 1000;
+
+ enum ContextProp {
+ // Node label Expression
+ NL(new NodeLabelProcessor()),
+ // Queue
+ Q(new QueueProcessor()),
+ // Tag Add
+ TA(new TagAddProcessor());
+
+ private ContextProcessor cp;
+ ContextProp(ContextProcessor cp) {
+ this.cp = cp;
+ }
+ }
+
+ private String hostsFilePath;
+ private volatile long lastModified = -1;
+ private volatile Map> hostCommands =
+ new HashMap<>();
+ private ScheduledExecutorService executorService;
+
+ public void start(Configuration conf) {
+ this.hostsFilePath =
+ conf.get(
+ YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH);
+ int refreshPeriod =
+ conf.getInt(
+ YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS,
+ YarnConfiguration.
+ DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS);
+
+ LOG.info("Submission Context Preprocessor enabled: file=[{}], "
+ + "interval=[{}]", this.hostsFilePath, refreshPeriod);
+
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ Runnable refreshConf = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ refresh();
+ } catch (Exception ex) {
+ LOG.error("Error while refreshing Submission PreProcessor file [{}]", hostsFilePath, ex);
+ }
+ }
+ };
+ if (refreshPeriod > 0) {
+ executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY, refreshPeriod, TimeUnit.MILLISECONDS);
+ } else {
+ executorService.schedule(refreshConf, INITIAL_DELAY, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void stop() {
+ if (this.executorService != null) {
+ this.executorService.shutdown();
+ }
+ }
+
+ public void preProcess(String host, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ Map cMap = hostCommands.get(host);
+
+ // Try regex match
+ if (cMap == null) {
+ for (Map.Entry> entry : hostCommands.entrySet()) {
+ if (entry.getKey().equals(DEFAULT_COMMANDS)) {
+ continue;
+ }
+ try {
+ Pattern p = Pattern.compile(entry.getKey());
+ Matcher m = p.matcher(host);
+ if (m.find()) {
+ cMap = hostCommands.get(entry.getKey());
+ }
+ } catch (PatternSyntaxException exception) {
+ LOG.warn("Invalid regex pattern: " + entry.getKey());
+ }
+ }
+ }
+ // Set to default value
+ if (cMap == null) {
+ cMap = hostCommands.get(DEFAULT_COMMANDS);
+ }
+ if (cMap != null) {
+ for (Map.Entry entry : cMap.entrySet()) {
+ entry.getKey().cp.process(host, entry.getValue(),
+ applicationId, submissionContext);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void refresh() throws Exception {
+ if (null == hostsFilePath || hostsFilePath.isEmpty()) {
+ LOG.warn("Host list file path [{}] is empty or does not exist !!",
+ hostsFilePath);
+ } else {
+ File hostFile = new File(hostsFilePath);
+ if (!hostFile.exists() || !hostFile.isFile()) {
+ LOG.warn("Host list file [{}] does not exist or is not a file !!",
+ hostFile);
+ } else if (hostFile.lastModified() <= lastModified) {
+ LOG.debug("Host list file [{}] has not been modified from last refresh",
+ hostFile);
+ } else {
+ FileInputStream fileInputStream = new FileInputStream(hostFile);
+ BufferedReader reader = null;
+ Map> tempHostCommands = new HashMap<>();
+ try {
+ reader = new BufferedReader(new InputStreamReader(fileInputStream, StandardCharsets.UTF_8));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // Lines should start with hostname and be followed with commands.
+ // Delimiter is any contiguous sequence of space or tab character.
+ // Commands are of the form:
+ // =
+ // where KEY can be 'NL', 'Q' or 'TA' (more can be added later)
+ // (TA stands for 'Tag Add')
+ // Sample lines:
+ // ...
+ // host1 NL=foo Q=b
+ // host2 Q=c NL=bar
+ // ...
+ String[] commands = line.split("[ \t\n\f\r]+");
+ if (commands != null && commands.length > 1) {
+ String host = commands[0].trim();
+ if (host.startsWith("#")) {
+ // All lines starting with # is a comment
+ continue;
+ }
+ Map cMap = null;
+ for (int i = 1; i < commands.length; i++) {
+ String[] cSplit = commands[i].split("=");
+ if (cSplit == null || cSplit.length != 2) {
+ LOG.error("No commands found for line [{}]", commands[i]);
+ continue;
+ }
+ if (cMap == null) {
+ cMap = new HashMap<>();
+ }
+ cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]);
+ }
+ if (cMap != null && cMap.size() > 0) {
+ tempHostCommands.put(host, cMap);
+ LOG.info("Following commands registered for host[{}] : {}",
+ host, cMap);
+ }
+ }
+ }
+ lastModified = hostFile.lastModified();
+ } catch (Exception ex) {
+ // Do not commit the new map if we have an Exception..
+ tempHostCommands = null;
+ throw ex;
+ } finally {
+ if (tempHostCommands != null && tempHostCommands.size() > 0) {
+ hostCommands = tempHostCommands;
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ fileInputStream.close();
+ }
+ }
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 3eed0be25ba..417df2357bd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -31,11 +31,15 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -51,11 +55,14 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
+
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
@@ -148,6 +155,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -165,6 +173,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -176,6 +185,8 @@
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@@ -194,11 +205,34 @@
.getRecordFactory(null);
private String appType = "MockApp";
-
+ private static RMDelegationTokenSecretManager dtsm;
+
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
private File resourceTypesFile = null;
+ private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT";
+ static {
+ KerberosName.setRules(kerberosRule);
+ }
+
+ @BeforeClass
+ public static void setupSecretManager() throws IOException {
+ RMContext rmContext = mock(RMContext.class);
+ YarnConfiguration conf = new YarnConfiguration();
+ when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
+ when(rmContext.getResourceManager()).thenReturn(new MockRM(conf));
+ dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
+ dtsm.startThreads();
+ }
+
+ @AfterClass
+ public static void teardownSecretManager() {
+ if (dtsm != null) {
+ dtsm.stopThreads();
+ }
+ }
+
@Test
public void testGetDecommissioningClusterNodes() throws Exception {
MockRM rm = new MockRM() {
@@ -263,7 +297,7 @@ protected ClientRMService createClientRMService() {
labelsMgr.replaceLabelsOnNode(map);
rm.sendNodeStarted(node);
node.nodeHeartbeat(true);
-
+
// Add and lose a node with label = y
MockNM lostNode = rm.registerNode("host2:1235", 1024);
rm.sendNodeStarted(lostNode);
@@ -288,7 +322,7 @@ protected ClientRMService createClientRMService() {
Assert.assertEquals(1, nodeReports.size());
Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY,
nodeReports.get(0).getNodeState());
-
+
// Check node's label = x
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
@@ -302,34 +336,34 @@ protected ClientRMService createClientRMService() {
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals("Unhealthy nodes should not show up by default", 0,
nodeReports.size());
-
+
// Change label of host1 to y
map = new HashMap>();
map.put(node.getNodeId(), ImmutableSet.of("y"));
labelsMgr.replaceLabelsOnNode(map);
-
+
// Now query for UNHEALTHY nodes
request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY));
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY,
nodeReports.get(0).getNodeState());
-
+
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
-
+
// Remove labels of host1
map = new HashMap>();
map.put(node.getNodeId(), ImmutableSet.of("y"));
labelsMgr.removeLabelsFromNode(map);
-
+
// Query all states should return all nodes
rm.registerNode("host3:1236", 1024);
request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(3, nodeReports.size());
-
+
// All host1-3's label should be empty (instead of null)
for (NodeReport report : nodeReports) {
Assert.assertTrue(report.getNodeLabels() != null
@@ -341,7 +375,7 @@ protected ClientRMService createClientRMService() {
rpc.stopProxy(client, conf);
rm.close();
}
-
+
@Test
public void testNonExistingApplicationReport() throws YarnException {
RMContext rmContext = mock(RMContext.class);
@@ -384,10 +418,10 @@ public void testGetApplicationReport() throws Exception {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId1);
- GetApplicationReportResponse response =
+ GetApplicationReportResponse response =
rmService.getApplicationReport(request);
ApplicationReport report = response.getApplicationReport();
- ApplicationResourceUsageReport usageReport =
+ ApplicationResourceUsageReport usageReport =
report.getApplicationResourceUsageReport();
Assert.assertEquals(10, usageReport.getMemorySeconds());
Assert.assertEquals(3, usageReport.getVcoreSeconds());
@@ -434,7 +468,7 @@ public void testGetApplicationReport() throws Exception {
rmService.close();
}
}
-
+
@Test
public void testGetApplicationAttemptReport() throws YarnException,
IOException {
@@ -468,7 +502,7 @@ public void testGetApplicationResourceUsageReportDummy() throws YarnException,
public void handle(Event event) {
}
});
- ApplicationSubmissionContext asContext =
+ ApplicationSubmissionContext asContext =
mock(ApplicationSubmissionContext.class);
YarnConfiguration config = new YarnConfiguration();
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
@@ -643,7 +677,7 @@ public void testForceKillApplication() throws Exception {
assertEquals("Incorrect number of apps in the RM", 2,
rmService.getApplications(getRequest).getApplicationList().size());
}
-
+
@Test (expected = ApplicationNotFoundException.class)
public void testMoveAbsentApplication() throws YarnException {
RMContext rmContext = mock(RMContext.class);
@@ -975,7 +1009,163 @@ public void testGetQueueInfo() throws Exception {
Assert.assertEquals(0, applications1.size());
}
-
+ @Test (timeout = 30000)
+ @SuppressWarnings ("rawtypes")
+ public void testAppSubmitWithSubmissionPreProcessor() throws Exception {
+ ResourceScheduler scheduler = mockResourceScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(scheduler, rmContext);
+ YarnConfiguration yConf = new YarnConfiguration();
+ yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, true);
+ yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+ // Override the YARN configuration.
+ when(rmContext.getYarnConfiguration()).thenReturn(yConf);
+ RMStateStore stateStore = mock(RMStateStore.class);
+ when(rmContext.getStateStore()).thenReturn(stateStore);
+ RMAppManager appManager = new RMAppManager(rmContext, scheduler,
+ null, mock(ApplicationACLsManager.class), new Configuration());
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler() {
+ public void handle(Event event) {}
+ });
+ ApplicationId appId1 = getApplicationId(100);
+
+ ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
+ when(
+ mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
+ ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
+
+ QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
+
+ when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
+ any(QueueACL.class), any(RMApp.class), any(String.class),
+ any()))
+ .thenReturn(true);
+
+ ClientRMService rmService =
+ new ClientRMService(rmContext, scheduler, appManager,
+ mockAclsManager, mockQueueACLsManager, null);
+
+
+ File rulesFile = File.createTempFile("submission_rules", ".tmp");
+ rulesFile.deleteOnExit();
+ rulesFile.createNewFile();
+
+ yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, rulesFile.getAbsolutePath());
+ rmService.serviceInit(yConf);
+ rmService.serviceStart();
+
+ BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile));
+ writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1");
+ writer.newLine();
+ writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2");
+ writer.newLine();
+ writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
+ writer.newLine();
+ writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
+ writer.newLine();
+ writer.write("* TA=cluster:other Q=default NL=barfoo");
+ writer.newLine();
+ writer.flush();
+ writer.close();
+ rmService.getContextPreProcessor().refresh();
+
+ setupCurrentCall("host.cluster1.com");
+ SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+ appId1, null, null);
+ try {
+ rmService.submitApplication(submitRequest1);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app1 = rmContext.getRMApps().get(appId1);
+ Assert.assertNotNull("app doesn't exist", app1);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
+ Assert.assertTrue("custom tag not present",
+ app1.getApplicationTags().contains("cluster:cluster1"));
+ Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue());
+ Assert.assertEquals("app node label doesn't match",
+ "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression());
+
+
+ setupCurrentCall("host.cluster2.com");
+ ApplicationId appId2 = getApplicationId(101);
+ SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+ appId2, null, null);
+ submitRequest2.getApplicationSubmissionContext().setApplicationType(
+ "matchType");
+ Set aTags = new HashSet();
+ aTags.add("mytag:foo");
+ submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
+ try {
+ rmService.submitApplication(submitRequest2);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app2 = rmContext.getRMApps().get(appId2);
+ Assert.assertNotNull("app doesn't exist", app2);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName());
+ Assert.assertTrue("client tag not present",
+ app2.getApplicationTags().contains("mytag:foo"));
+ Assert.assertTrue("custom tag not present",
+ app2.getApplicationTags().contains("cluster:cluster2"));
+ Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue());
+ Assert.assertEquals("app node label doesn't match",
+ "zuess", app2.getApplicationSubmissionContext().getNodeLabelExpression());
+
+ // Test Default commands
+ setupCurrentCall("host2.cluster3.com");
+ ApplicationId appId3 = getApplicationId(102);
+ SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest(
+ appId3, null, null);
+ submitRequest3.getApplicationSubmissionContext().setApplicationType(
+ "matchType");
+ submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags);
+ try {
+ rmService.submitApplication(submitRequest3);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app3 = rmContext.getRMApps().get(appId3);
+ Assert.assertNotNull("app doesn't exist", app3);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName());
+ Assert.assertTrue("client tag not present",
+ app3.getApplicationTags().contains("mytag:foo"));
+ Assert.assertTrue("custom tag not present",
+ app3.getApplicationTags().contains("cluster:other"));
+ Assert.assertEquals("app queue doesn't match", "default", app3.getQueue());
+ Assert.assertEquals("app node label doesn't match",
+ "barfoo", app3.getApplicationSubmissionContext().getNodeLabelExpression());
+
+ // Test regex
+ setupCurrentCall("host.cluster100.com");
+ ApplicationId appId4 = getApplicationId(103);
+ SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest(
+ appId4, null, null);
+ try {
+ rmService.submitApplication(submitRequest4);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app4 = rmContext.getRMApps().get(appId4);
+ Assert.assertTrue("custom tag not present",
+ app4.getApplicationTags().contains("cluster:reg"));
+ Assert.assertEquals("app node label doesn't match",
+ "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression());
+ rmService.serviceStop();
+ }
+
+ private void setupCurrentCall(String hostName) throws UnknownHostException {
+ Server.Call mockCall = mock(Server.Call.class);
+ when(mockCall.getHostInetAddress()).thenReturn(
+ InetAddress.getByAddress(hostName,
+ new byte[]{123, 123, 123, 123}));
+ Server.getCurCall().set(mockCall);
+ }
+
@Test (timeout = 30000)
@SuppressWarnings ("rawtypes")
public void testAppSubmit() throws Exception {
@@ -1034,6 +1224,9 @@ public void handle(Event event) {}
appId2, name, queue);
submitRequest2.getApplicationSubmissionContext().setApplicationType(
"matchType");
+ Set aTags = new HashSet();
+ aTags.add("mytag:foo");
+ submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
try {
rmService.submitApplication(submitRequest2);
} catch (YarnException e) {
@@ -1123,7 +1316,7 @@ public void handle(Event event) {}
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
List tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-
+
long[] submitTimeMillis = new long[3];
// Submit applications
for (int i = 0; i < appIds.length; i++) {
@@ -1150,23 +1343,23 @@ public void handle(Event event) {}
request.setLimit(1L);
assertEquals("Failed to limit applications", 1,
rmService.getApplications(request).getApplicationList().size());
-
+
// Check start range
request = GetApplicationsRequest.newInstance();
request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis());
-
+
// 2 applications are submitted after first timeMills
- assertEquals("Incorrect number of matching start range",
+ assertEquals("Incorrect number of matching start range",
2, rmService.getApplications(request).getApplicationList().size());
-
+
// 1 application is submitted after the second timeMills
request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis());
- assertEquals("Incorrect number of matching start range",
+ assertEquals("Incorrect number of matching start range",
1, rmService.getApplications(request).getApplicationList().size());
-
+
// no application is submitted after the third timeMills
request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis());
- assertEquals("Incorrect number of matching start range",
+ assertEquals("Incorrect number of matching start range",
0, rmService.getApplications(request).getApplicationList().size());
// Check queue
@@ -1238,7 +1431,7 @@ public void handle(Event event) {}
assertEquals("Incorrect number of applications for the scope", 3,
rmService.getApplications(request).getApplicationList().size());
}
-
+
@Test(timeout=4000)
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException,
@@ -1257,7 +1450,7 @@ public void testConcurrentAppSubmit()
appId1, null, null);
final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
appId2, null, null);
-
+
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
@@ -1299,7 +1492,7 @@ public void run() {
}
};
t.start();
-
+
// submit another app, so go through while the first app is blocked
startBarrier.await();
rmService.submitApplication(submitRequest2);
@@ -1385,7 +1578,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext)
private ConcurrentHashMap getRMApps(
RMContext rmContext, YarnScheduler yarnScheduler) {
- ConcurrentHashMap apps =
+ ConcurrentHashMap apps =
new ConcurrentHashMap();
ApplicationId applicationId1 = getApplicationId(1);
ApplicationId applicationId2 = getApplicationId(2);
@@ -1399,7 +1592,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext)
config, "testqueue", 40, 5,"high-mem","high-mem"));
return apps;
}
-
+
private List getSchedulerApps(
Map apps) {
List schedApps = new ArrayList();
@@ -1412,7 +1605,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext)
private static ApplicationId getApplicationId(int id) {
return ApplicationId.newInstance(123456, id);
}
-
+
private static ApplicationAttemptId getApplicationAttemptId(int id) {
return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
}
@@ -1437,7 +1630,7 @@ public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationReport report = super.createAndGetApplicationReport(
clientUserName, allowAccess);
- ApplicationResourceUsageReport usageReport =
+ ApplicationResourceUsageReport usageReport =
report.getApplicationResourceUsageReport();
usageReport.setMemorySeconds(memorySeconds);
usageReport.setVcoreSeconds(vcoreSeconds);
@@ -1457,7 +1650,7 @@ public ApplicationReport createAndGetApplicationReport(
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), attemptId, null, "",
rmContext));
- Map attempts =
+ Map attempts =
new HashMap();
attempts.put(attemptId, rmAppAttemptImpl);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);