diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1e55fe383b0..489b68fa81b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -543,6 +543,24 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; + + /** Enable submission pre-processor.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED = + RM_PREFIX + "submission-preprocessor.enabled"; + public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED = + false; + + /** Path to file with hosts for the submission processor to handle.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH = + RM_PREFIX + "submission-preprocessor.file-path"; + public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH = + ""; + + /** Submission processor refresh interval.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = + RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms"; + public static final int DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = + 0; /** Path to file with nodes to exclude.*/ public static final String RM_NODES_EXCLUDE_FILE_PATH = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7a672dee6c6..2a0ed4d963d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4250,4 +4250,25 @@ yarn.nodemanager.containers-launcher.class org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher + + + + Enable the Pre processing of Application Submission context with server side configuration + + yarn.resourcemanager.submission-preprocessor.enabled + false + + + + Path to file with hosts for the submission processor to handle. + yarn.resourcemanager.submission-preprocessor.file-path + + + + + Submission processor refresh interval + yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms + 60000 + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 1e54cd6b8d2..55b7058ddf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -231,6 +231,7 @@ private Clock clock; private ReservationSystem reservationSystem; private ReservationInputValidator rValidator; + private SubmissionContextPreProcessor contextPreProcessor; private boolean filterAppsByUser = false; @@ -313,6 +314,13 @@ protected void serviceStart() throws Exception { this.timelineServiceV2Enabled = YarnConfiguration. timelineServiceV2Enabled(conf); + if (conf.getBoolean( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, + YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) { + this.contextPreProcessor = new SubmissionContextPreProcessor(); + this.contextPreProcessor.start(conf); + } + super.serviceStart(); } @@ -321,6 +329,9 @@ protected void serviceStop() throws Exception { if (this.server != null) { this.server.stop(); } + if (this.contextPreProcessor != null) { + this.contextPreProcessor.stop(); + } super.serviceStop(); } @@ -332,6 +343,11 @@ InetSocketAddress getBindAddress(Configuration conf) { YarnConfiguration.DEFAULT_RM_PORT); } + @VisibleForTesting + SubmissionContextPreProcessor getContextPreProcessor() { + return this.contextPreProcessor; + } + @Private public InetSocketAddress getBindAddress() { return clientBindAddress; @@ -663,6 +679,11 @@ public SubmitApplicationResponse submitApplication( checkReservationACLs(submissionContext.getQueue(), AuditConstants .SUBMIT_RESERVATION_REQUEST, reservationId); + if (this.contextPreProcessor != null) { + this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(), + applicationId, submissionContext); + } + try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, @@ -1096,8 +1117,7 @@ public RenewDelegationTokenResponse renewDelegationToken( RenewDelegationTokenRequest request) throws YarnException { try { if (!isAllowedDelegationTokenOp()) { - throw new IOException( - "Delegation Token can be renewed only with kerberos authentication"); + throw new IOException("Delegation Token can be renewed only with kerberos authentication"); } org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java new file mode 100644 index 00000000000..f134328b652 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Pre process the ApplicationSubmissionContext with server side + * information + */ +public class SubmissionContextPreProcessor { + + private static Logger LOG = LoggerFactory.getLogger( + SubmissionContextPreProcessor.class); + private static String DEFAULT_COMMANDS = "*"; + + interface ContextProcessor { + void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext); + } + + static class NodeLabelProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + submissionContext.setNodeLabelExpression(value); + } + } + + static class QueueProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + submissionContext.setQueue(value); + } + } + + static class TagAddProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + Set applicationTags = submissionContext.getApplicationTags(); + if (applicationTags == null) { + applicationTags = new HashSet<>(); + } else { + applicationTags = new HashSet<>(applicationTags); + } + applicationTags.add(value); + submissionContext.setApplicationTags(applicationTags); + } + } + + private static final int INITIAL_DELAY = 1000; + + enum ContextProp { + // Node label Expression + NL(new NodeLabelProcessor()), + // Queue + Q(new QueueProcessor()), + // Tag Add + TA(new TagAddProcessor()); + + private ContextProcessor cp; + ContextProp(ContextProcessor cp) { + this.cp = cp; + } + } + + private String hostsFilePath; + private volatile long lastModified = -1; + private volatile Map> hostCommands = + new HashMap<>(); + private ScheduledExecutorService executorService; + + public void start(Configuration conf) { + this.hostsFilePath = + conf.get( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, + YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH); + int refreshPeriod = + conf.getInt( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS, + YarnConfiguration. + DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS); + + LOG.info("Submission Context Preprocessor enabled: file=[{}], " + + "interval=[{}]", this.hostsFilePath, refreshPeriod); + + executorService = Executors.newSingleThreadScheduledExecutor(); + Runnable refreshConf = new Runnable() { + @Override + public void run() { + try { + refresh(); + } catch (Exception ex) { + LOG.error("Error while refreshing Submission PreProcessor file [{}]", hostsFilePath, ex); + } + } + }; + if (refreshPeriod > 0) { + executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY, refreshPeriod, TimeUnit.MILLISECONDS); + } else { + executorService.schedule(refreshConf, INITIAL_DELAY, TimeUnit.MILLISECONDS); + } + } + + public void stop() { + if (this.executorService != null) { + this.executorService.shutdown(); + } + } + + public void preProcess(String host, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + Map cMap = hostCommands.get(host); + + // Try regex match + if (cMap == null) { + for (Map.Entry> entry : hostCommands.entrySet()) { + if (entry.getKey().equals(DEFAULT_COMMANDS)) { + continue; + } + try { + Pattern p = Pattern.compile(entry.getKey()); + Matcher m = p.matcher(host); + if (m.find()) { + cMap = hostCommands.get(entry.getKey()); + } + } catch (PatternSyntaxException exception) { + LOG.warn("Invalid regex pattern: " + entry.getKey()); + } + } + } + // Set to default value + if (cMap == null) { + cMap = hostCommands.get(DEFAULT_COMMANDS); + } + if (cMap != null) { + for (Map.Entry entry : cMap.entrySet()) { + entry.getKey().cp.process(host, entry.getValue(), + applicationId, submissionContext); + } + } + } + + @VisibleForTesting + void refresh() throws Exception { + if (null == hostsFilePath || hostsFilePath.isEmpty()) { + LOG.warn("Host list file path [{}] is empty or does not exist !!", + hostsFilePath); + } else { + File hostFile = new File(hostsFilePath); + if (!hostFile.exists() || !hostFile.isFile()) { + LOG.warn("Host list file [{}] does not exist or is not a file !!", + hostFile); + } else if (hostFile.lastModified() <= lastModified) { + LOG.debug("Host list file [{}] has not been modified from last refresh", + hostFile); + } else { + FileInputStream fileInputStream = new FileInputStream(hostFile); + BufferedReader reader = null; + Map> tempHostCommands = new HashMap<>(); + try { + reader = new BufferedReader(new InputStreamReader(fileInputStream, StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + // Lines should start with hostname and be followed with commands. + // Delimiter is any contiguous sequence of space or tab character. + // Commands are of the form: + // = + // where KEY can be 'NL', 'Q' or 'TA' (more can be added later) + // (TA stands for 'Tag Add') + // Sample lines: + // ... + // host1 NL=foo Q=b + // host2 Q=c NL=bar + // ... + String[] commands = line.split("[ \t\n\f\r]+"); + if (commands != null && commands.length > 1) { + String host = commands[0].trim(); + if (host.startsWith("#")) { + // All lines starting with # is a comment + continue; + } + Map cMap = null; + for (int i = 1; i < commands.length; i++) { + String[] cSplit = commands[i].split("="); + if (cSplit == null || cSplit.length != 2) { + LOG.error("No commands found for line [{}]", commands[i]); + continue; + } + if (cMap == null) { + cMap = new HashMap<>(); + } + cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]); + } + if (cMap != null && cMap.size() > 0) { + tempHostCommands.put(host, cMap); + LOG.info("Following commands registered for host[{}] : {}", + host, cMap); + } + } + } + lastModified = hostFile.lastModified(); + } catch (Exception ex) { + // Do not commit the new map if we have an Exception.. + tempHostCommands = null; + throw ex; + } finally { + if (tempHostCommands != null && tempHostCommands.size() > 0) { + hostCommands = tempHostCommands; + } + if (reader != null) { + reader.close(); + } + fileInputStream.close(); + } + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 3eed0be25ba..417df2357bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -31,11 +31,15 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.security.AccessControlException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -51,11 +55,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; + import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; @@ -148,6 +155,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -165,6 +173,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -176,6 +185,8 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.BeforeClass; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -194,11 +205,34 @@ .getRecordFactory(null); private String appType = "MockApp"; - + private static RMDelegationTokenSecretManager dtsm; + private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; private File resourceTypesFile = null; + private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT"; + static { + KerberosName.setRules(kerberosRule); + } + + @BeforeClass + public static void setupSecretManager() throws IOException { + RMContext rmContext = mock(RMContext.class); + YarnConfiguration conf = new YarnConfiguration(); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + when(rmContext.getResourceManager()).thenReturn(new MockRM(conf)); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); + dtsm.startThreads(); + } + + @AfterClass + public static void teardownSecretManager() { + if (dtsm != null) { + dtsm.stopThreads(); + } + } + @Test public void testGetDecommissioningClusterNodes() throws Exception { MockRM rm = new MockRM() { @@ -263,7 +297,7 @@ protected ClientRMService createClientRMService() { labelsMgr.replaceLabelsOnNode(map); rm.sendNodeStarted(node); node.nodeHeartbeat(true); - + // Add and lose a node with label = y MockNM lostNode = rm.registerNode("host2:1235", 1024); rm.sendNodeStarted(lostNode); @@ -288,7 +322,7 @@ protected ClientRMService createClientRMService() { Assert.assertEquals(1, nodeReports.size()); Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); - + // Check node's label = x Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); @@ -302,34 +336,34 @@ protected ClientRMService createClientRMService() { nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals("Unhealthy nodes should not show up by default", 0, nodeReports.size()); - + // Change label of host1 to y map = new HashMap>(); map.put(node.getNodeId(), ImmutableSet.of("y")); labelsMgr.replaceLabelsOnNode(map); - + // Now query for UNHEALTHY nodes request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY)); nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(1, nodeReports.size()); Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); - + Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); - + // Remove labels of host1 map = new HashMap>(); map.put(node.getNodeId(), ImmutableSet.of("y")); labelsMgr.removeLabelsFromNode(map); - + // Query all states should return all nodes rm.registerNode("host3:1236", 1024); request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(3, nodeReports.size()); - + // All host1-3's label should be empty (instead of null) for (NodeReport report : nodeReports) { Assert.assertTrue(report.getNodeLabels() != null @@ -341,7 +375,7 @@ protected ClientRMService createClientRMService() { rpc.stopProxy(client, conf); rm.close(); } - + @Test public void testNonExistingApplicationReport() throws YarnException { RMContext rmContext = mock(RMContext.class); @@ -384,10 +418,10 @@ public void testGetApplicationReport() throws Exception { GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId1); - GetApplicationReportResponse response = + GetApplicationReportResponse response = rmService.getApplicationReport(request); ApplicationReport report = response.getApplicationReport(); - ApplicationResourceUsageReport usageReport = + ApplicationResourceUsageReport usageReport = report.getApplicationResourceUsageReport(); Assert.assertEquals(10, usageReport.getMemorySeconds()); Assert.assertEquals(3, usageReport.getVcoreSeconds()); @@ -434,7 +468,7 @@ public void testGetApplicationReport() throws Exception { rmService.close(); } } - + @Test public void testGetApplicationAttemptReport() throws YarnException, IOException { @@ -468,7 +502,7 @@ public void testGetApplicationResourceUsageReportDummy() throws YarnException, public void handle(Event event) { } }); - ApplicationSubmissionContext asContext = + ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); YarnConfiguration config = new YarnConfiguration(); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, @@ -643,7 +677,7 @@ public void testForceKillApplication() throws Exception { assertEquals("Incorrect number of apps in the RM", 2, rmService.getApplications(getRequest).getApplicationList().size()); } - + @Test (expected = ApplicationNotFoundException.class) public void testMoveAbsentApplication() throws YarnException { RMContext rmContext = mock(RMContext.class); @@ -975,7 +1009,163 @@ public void testGetQueueInfo() throws Exception { Assert.assertEquals(0, applications1.size()); } - + @Test (timeout = 30000) + @SuppressWarnings ("rawtypes") + public void testAppSubmitWithSubmissionPreProcessor() throws Exception { + ResourceScheduler scheduler = mockResourceScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(scheduler, rmContext); + YarnConfiguration yConf = new YarnConfiguration(); + yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, true); + yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + // Override the YARN configuration. + when(rmContext.getYarnConfiguration()).thenReturn(yConf); + RMStateStore stateStore = mock(RMStateStore.class); + when(rmContext.getStateStore()).thenReturn(stateStore); + RMAppManager appManager = new RMAppManager(rmContext, scheduler, + null, mock(ApplicationACLsManager.class), new Configuration()); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler() { + public void handle(Event event) {} + }); + ApplicationId appId1 = getApplicationId(100); + + ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); + when( + mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), + ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); + + QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); + + when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), any(RMApp.class), any(String.class), + any())) + .thenReturn(true); + + ClientRMService rmService = + new ClientRMService(rmContext, scheduler, appManager, + mockAclsManager, mockQueueACLsManager, null); + + + File rulesFile = File.createTempFile("submission_rules", ".tmp"); + rulesFile.deleteOnExit(); + rulesFile.createNewFile(); + + yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, rulesFile.getAbsolutePath()); + rmService.serviceInit(yConf); + rmService.serviceStart(); + + BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile)); + writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1"); + writer.newLine(); + writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2"); + writer.newLine(); + writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); + writer.newLine(); + writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); + writer.newLine(); + writer.write("* TA=cluster:other Q=default NL=barfoo"); + writer.newLine(); + writer.flush(); + writer.close(); + rmService.getContextPreProcessor().refresh(); + + setupCurrentCall("host.cluster1.com"); + SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( + appId1, null, null); + try { + rmService.submitApplication(submitRequest1); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app1 = rmContext.getRMApps().get(appId1); + Assert.assertNotNull("app doesn't exist", app1); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); + Assert.assertTrue("custom tag not present", + app1.getApplicationTags().contains("cluster:cluster1")); + Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue()); + Assert.assertEquals("app node label doesn't match", + "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression()); + + + setupCurrentCall("host.cluster2.com"); + ApplicationId appId2 = getApplicationId(101); + SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( + appId2, null, null); + submitRequest2.getApplicationSubmissionContext().setApplicationType( + "matchType"); + Set aTags = new HashSet(); + aTags.add("mytag:foo"); + submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags); + try { + rmService.submitApplication(submitRequest2); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app2 = rmContext.getRMApps().get(appId2); + Assert.assertNotNull("app doesn't exist", app2); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName()); + Assert.assertTrue("client tag not present", + app2.getApplicationTags().contains("mytag:foo")); + Assert.assertTrue("custom tag not present", + app2.getApplicationTags().contains("cluster:cluster2")); + Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue()); + Assert.assertEquals("app node label doesn't match", + "zuess", app2.getApplicationSubmissionContext().getNodeLabelExpression()); + + // Test Default commands + setupCurrentCall("host2.cluster3.com"); + ApplicationId appId3 = getApplicationId(102); + SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest( + appId3, null, null); + submitRequest3.getApplicationSubmissionContext().setApplicationType( + "matchType"); + submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags); + try { + rmService.submitApplication(submitRequest3); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app3 = rmContext.getRMApps().get(appId3); + Assert.assertNotNull("app doesn't exist", app3); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName()); + Assert.assertTrue("client tag not present", + app3.getApplicationTags().contains("mytag:foo")); + Assert.assertTrue("custom tag not present", + app3.getApplicationTags().contains("cluster:other")); + Assert.assertEquals("app queue doesn't match", "default", app3.getQueue()); + Assert.assertEquals("app node label doesn't match", + "barfoo", app3.getApplicationSubmissionContext().getNodeLabelExpression()); + + // Test regex + setupCurrentCall("host.cluster100.com"); + ApplicationId appId4 = getApplicationId(103); + SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest( + appId4, null, null); + try { + rmService.submitApplication(submitRequest4); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app4 = rmContext.getRMApps().get(appId4); + Assert.assertTrue("custom tag not present", + app4.getApplicationTags().contains("cluster:reg")); + Assert.assertEquals("app node label doesn't match", + "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression()); + rmService.serviceStop(); + } + + private void setupCurrentCall(String hostName) throws UnknownHostException { + Server.Call mockCall = mock(Server.Call.class); + when(mockCall.getHostInetAddress()).thenReturn( + InetAddress.getByAddress(hostName, + new byte[]{123, 123, 123, 123})); + Server.getCurCall().set(mockCall); + } + @Test (timeout = 30000) @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { @@ -1034,6 +1224,9 @@ public void handle(Event event) {} appId2, name, queue); submitRequest2.getApplicationSubmissionContext().setApplicationType( "matchType"); + Set aTags = new HashSet(); + aTags.add("mytag:foo"); + submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags); try { rmService.submitApplication(submitRequest2); } catch (YarnException e) { @@ -1123,7 +1316,7 @@ public void handle(Event event) {} ApplicationId[] appIds = {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; List tags = Arrays.asList("Tag1", "Tag2", "Tag3"); - + long[] submitTimeMillis = new long[3]; // Submit applications for (int i = 0; i < appIds.length; i++) { @@ -1150,23 +1343,23 @@ public void handle(Event event) {} request.setLimit(1L); assertEquals("Failed to limit applications", 1, rmService.getApplications(request).getApplicationList().size()); - + // Check start range request = GetApplicationsRequest.newInstance(); request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis()); - + // 2 applications are submitted after first timeMills - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 2, rmService.getApplications(request).getApplicationList().size()); - + // 1 application is submitted after the second timeMills request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis()); - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 1, rmService.getApplications(request).getApplicationList().size()); - + // no application is submitted after the third timeMills request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis()); - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 0, rmService.getApplications(request).getApplicationList().size()); // Check queue @@ -1238,7 +1431,7 @@ public void handle(Event event) {} assertEquals("Incorrect number of applications for the scope", 3, rmService.getApplications(request).getApplicationList().size()); } - + @Test(timeout=4000) public void testConcurrentAppSubmit() throws IOException, InterruptedException, BrokenBarrierException, @@ -1257,7 +1450,7 @@ public void testConcurrentAppSubmit() appId1, null, null); final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( appId2, null, null); - + final CyclicBarrier startBarrier = new CyclicBarrier(2); final CyclicBarrier endBarrier = new CyclicBarrier(2); @@ -1299,7 +1492,7 @@ public void run() { } }; t.start(); - + // submit another app, so go through while the first app is blocked startBarrier.await(); rmService.submitApplication(submitRequest2); @@ -1385,7 +1578,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) private ConcurrentHashMap getRMApps( RMContext rmContext, YarnScheduler yarnScheduler) { - ConcurrentHashMap apps = + ConcurrentHashMap apps = new ConcurrentHashMap(); ApplicationId applicationId1 = getApplicationId(1); ApplicationId applicationId2 = getApplicationId(2); @@ -1399,7 +1592,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) config, "testqueue", 40, 5,"high-mem","high-mem")); return apps; } - + private List getSchedulerApps( Map apps) { List schedApps = new ArrayList(); @@ -1412,7 +1605,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) private static ApplicationId getApplicationId(int id) { return ApplicationId.newInstance(123456, id); } - + private static ApplicationAttemptId getApplicationAttemptId(int id) { return ApplicationAttemptId.newInstance(getApplicationId(id), 1); } @@ -1437,7 +1630,7 @@ public ApplicationReport createAndGetApplicationReport( String clientUserName, boolean allowAccess) { ApplicationReport report = super.createAndGetApplicationReport( clientUserName, allowAccess); - ApplicationResourceUsageReport usageReport = + ApplicationResourceUsageReport usageReport = report.getApplicationResourceUsageReport(); usageReport.setMemorySeconds(memorySeconds); usageReport.setVcoreSeconds(vcoreSeconds); @@ -1457,7 +1650,7 @@ public ApplicationReport createAndGetApplicationReport( RMContainerImpl containerimpl = spy(new RMContainerImpl(container, SchedulerRequestKey.extractFrom(container), attemptId, null, "", rmContext)); - Map attempts = + Map attempts = new HashMap(); attempts.put(attemptId, rmAppAttemptImpl); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);