diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1e55fe383b0..6230e345b9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -198,7 +198,7 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_RM_EPOCH_RANGE = 0; /** The address of the applications manager interface in the RM.*/ - public static final String RM_ADDRESS = + public static final String RM_ADDRESS = RM_PREFIX + "address"; public static final int DEFAULT_RM_PORT = 8032; public static final String DEFAULT_RM_ADDRESS = @@ -233,9 +233,9 @@ private static void addDeprecatedKeys() { /** The Kerberos principal for the resource manager.*/ public static final String RM_PRINCIPAL = RM_PREFIX + "principal"; - + /** The address of the scheduler interface.*/ - public static final String RM_SCHEDULER_ADDRESS = + public static final String RM_SCHEDULER_ADDRESS = RM_PREFIX + "scheduler.address"; public static final int DEFAULT_RM_SCHEDULER_PORT = 8030; public static final String DEFAULT_RM_SCHEDULER_ADDRESS = "0.0.0.0:" + @@ -263,12 +263,12 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; /** If the port should be included or not in the node name. The node name - * is used by the scheduler for resource requests allocation location + * is used by the scheduler for resource requests allocation location * matching. Typically this is just the hostname, using the port is needed * when using minicluster and specific NM are required.*/ public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME = YARN_PREFIX + "scheduler.include-port-in-node-name"; - public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = + public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false; /** Configured scheduler queue placement rules. */ @@ -487,17 +487,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false; /** The address of the RM admin interface.*/ - public static final String RM_ADMIN_ADDRESS = + public static final String RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"; public static final int DEFAULT_RM_ADMIN_PORT = 8033; public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" + DEFAULT_RM_ADMIN_PORT; - + /**Number of threads used to handle RM admin interface.*/ public static final String RM_ADMIN_CLIENT_THREAD_COUNT = RM_PREFIX + "admin.client.thread-count"; public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1; - + /** * The maximum number of application attempts. * It's a global setting for all application masters. @@ -505,15 +505,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.max-attempts"; public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2; - + /** The keytab for the resource manager.*/ - public static final String RM_KEYTAB = + public static final String RM_KEYTAB = RM_PREFIX + "keytab"; /**The kerberos principal to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_USER_NAME_KEY = RM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = RM_PREFIX + "webapp.spnego-keytab-file"; @@ -535,14 +535,32 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER = false; /** How long to wait until a container is considered dead.*/ - public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = + public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000; - + /** Path to file with nodes to include.*/ - public static final String RM_NODES_INCLUDE_FILE_PATH = + public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; + + /** Enable submission pre-processor.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED = + RM_PREFIX + "submission-preprocessor.enabled"; + public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED = + false; + + /** Path to file with hosts for the submission processor to handle.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH = + RM_PREFIX + "submission-preprocessor.file-path"; + public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH = + ""; + + /** Submission processor refresh interval.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = + RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms"; + public static final int DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = + 60000; /** Path to file with nodes to exclude.*/ public static final String RM_NODES_EXCLUDE_FILE_PATH = @@ -968,7 +986,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; - + /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; @@ -1026,7 +1044,7 @@ public static boolean isAclEnabled(Configuration conf) { /** Default application type length */ public static final int APPLICATION_TYPE_LENGTH = 20; - + /** Default queue name */ public static final String DEFAULT_QUEUE_NAME = "default"; @@ -1039,7 +1057,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Default sizes of the runtime metric buckets in minutes. */ - public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = + public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = "60,300,1440"; public static final String RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX @@ -1056,7 +1074,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX + "nm-tokens.master-key-rolling-interval-secs"; - + public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = 24 * 60 * 60; @@ -1135,7 +1153,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// // Node Manager Configs //////////////////////////////// - + /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; @@ -1261,14 +1279,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOCALIZER_CACHE_TARGET_SIZE_MB = NM_PREFIX + "localizer.cache.target-size-mb"; public static final long DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB = 10 * 1024; - + /** Number of threads to handle localization requests.*/ public static final String NM_LOCALIZER_CLIENT_THREAD_COUNT = NM_PREFIX + "localizer.client.thread-count"; public static final int DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT = 5; - + /** Number of threads to use for localization fetching.*/ - public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = + public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = NM_PREFIX + "localizer.fetch.thread-count"; public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4; @@ -1323,7 +1341,7 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; - + /** Delegation Token renewer thread count */ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = RM_PREFIX + "delegation-token-renewer.thread-count"; @@ -1437,12 +1455,12 @@ public static boolean isAclEnabled(Configuration conf) { * Number of threads used in log cleanup. Only applicable if Log aggregation * is disabled */ - public static final String NM_LOG_DELETION_THREADS_COUNT = + public static final String NM_LOG_DELETION_THREADS_COUNT = NM_PREFIX + "log.deletion-threads-count"; public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4; /** Where to aggregate logs to.*/ - public static final String NM_REMOTE_APP_LOG_DIR = + public static final String NM_REMOTE_APP_LOG_DIR = NM_PREFIX + "remote-app-log-dir"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; @@ -1922,22 +1940,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0; /** Frequency of running node health script.*/ - public static final String NM_HEALTH_CHECK_INTERVAL_MS = + public static final String NM_HEALTH_CHECK_INTERVAL_MS = NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; - /** Health check script time out period.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + /** Health check script time out period.*/ + public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = NM_PREFIX + "health-checker.script.timeout-ms"; - public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; - + /** The health check script to run.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_PATH = + public static final String NM_HEALTH_CHECK_SCRIPT_PATH = NM_PREFIX + "health-checker.script.path"; - + /** The arguments to pass to the health check script.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = + public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = NM_PREFIX + "health-checker.script.opts"; /** The JVM options used on forking ContainerLocalizer process @@ -2187,30 +2205,30 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_NONSECURE_MODE_LOCAL_USER = "nobody"; /** - * The allowed pattern for UNIX user names enforced by - * Linux-container-executor when used in nonsecure mode (use case for this + * The allowed pattern for UNIX user names enforced by + * Linux-container-executor when used in nonsecure mode (use case for this * is using cgroups). The default value is taken from /usr/sbin/adduser */ public static final String NM_NONSECURE_MODE_USER_PATTERN_KEY = NM_PREFIX + "linux-container-executor.nonsecure-mode.user-pattern"; - public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = + public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = "^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$"; /** The type of resource enforcement to use with the * linux container executor. */ - public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = + public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = NM_PREFIX + "linux-container-executor.resources-handler.class"; - + /** The path the linux container executor should use for cgroups */ public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY = NM_PREFIX + "linux-container-executor.cgroups.hierarchy"; - + /** Whether the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT = NM_PREFIX + "linux-container-executor.cgroups.mount"; - + /** Where the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH = NM_PREFIX + "linux-container-executor.cgroups.mount-path"; @@ -2254,7 +2272,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Interval of time the linux container executor should try cleaning up - * cgroups entry when cleaning up a container. This is required due to what + * cgroups entry when cleaning up a container. This is required due to what * it seems a race condition because the SIGTERM/SIGKILL is asynch. */ public static final String NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT = @@ -3010,7 +3028,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10; - + /** The address of the timeline service web application.*/ public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS = @@ -3231,7 +3249,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String SHARED_CACHE_NESTED_LEVEL = SHARED_CACHE_PREFIX + "nested-level"; public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; - + // Shared Cache Manager Configs public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store."; @@ -3265,7 +3283,7 @@ public static boolean isAclEnabled(Configuration conf) { "0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT; // In-memory SCM store configuration - + public static final String IN_MEMORY_STORE_PREFIX = SCM_STORE_PREFIX + "in-memory."; @@ -3286,7 +3304,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String IN_MEMORY_INITIAL_DELAY_MINS = IN_MEMORY_STORE_PREFIX + "initial-delay-mins"; public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10; - + /** * The frequency at which the in-memory store checks to remove dead initial * applications. Specified in minutes. @@ -4076,7 +4094,7 @@ public static boolean areNodeLabelsEnabled( public YarnConfiguration() { super(); } - + public YarnConfiguration(Configuration conf) { super(conf); if (! (conf instanceof YarnConfiguration)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 1e54cd6b8d2..f108c04911f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -231,6 +231,7 @@ private Clock clock; private ReservationSystem reservationSystem; private ReservationInputValidator rValidator; + private SubmissionContextPreProcessor contextPreProcessor; private boolean filterAppsByUser = false; @@ -275,13 +276,13 @@ protected void serviceInit(Configuration conf) throws Exception { protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); - this.server = + this.server = rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, conf, this.rmDTSecretManager, - conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, + conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); - + this.server.addTerseExceptions(ApplicationNotFoundException.class, ApplicationAttemptNotFoundException.class, ContainerNotFoundException.class, @@ -289,7 +290,7 @@ protected void serviceStart() throws Exception { // Enable service authorization? if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { InputStream inputStream = this.rmContext.getConfigurationProvider() @@ -313,6 +314,13 @@ protected void serviceStart() throws Exception { this.timelineServiceV2Enabled = YarnConfiguration. timelineServiceV2Enabled(conf); + if (conf.getBoolean( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, + YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) { + this.contextPreProcessor = new SubmissionContextPreProcessor(); + this.contextPreProcessor.start(conf); + } + super.serviceStart(); } @@ -321,6 +329,9 @@ protected void serviceStop() throws Exception { if (this.server != null) { this.server.stop(); } + if (this.contextPreProcessor != null) { + this.contextPreProcessor.stop(); + } super.serviceStop(); } @@ -332,6 +343,11 @@ InetSocketAddress getBindAddress(Configuration conf) { YarnConfiguration.DEFAULT_RM_PORT); } + @VisibleForTesting + SubmissionContextPreProcessor getContextPreProcessor() { + return this.contextPreProcessor; + } + @Private public InetSocketAddress getBindAddress() { return clientBindAddress; @@ -371,11 +387,11 @@ public GetNewApplicationResponse getNewApplication( response.setApplicationId(getNewApplicationId()); // Pick up min/max resource from scheduler... response.setMaximumResourceCapability(scheduler - .getMaximumResourceCapability()); - + .getMaximumResourceCapability()); + return response; } - + /** * It gives response which includes application report if the application * present otherwise throws ApplicationNotFoundException. @@ -449,7 +465,7 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport( } return response; } - + @Override public GetApplicationAttemptsResponse getApplicationAttempts( GetApplicationAttemptsRequest request) throws YarnException, IOException { @@ -465,7 +481,7 @@ public GetApplicationAttemptsResponse getApplicationAttempts( if (allowAccess) { Map attempts = application .getAppAttempts(); - List listAttempts = + List listAttempts = new ArrayList(); Iterator> iter = attempts .entrySet().iterator(); @@ -480,10 +496,10 @@ public GetApplicationAttemptsResponse getApplicationAttempts( } return response; } - + /* * (non-Javadoc) - * + * * we're going to fix the issue of showing non-running containers of the * running application in YARN-1794 */ @@ -522,10 +538,10 @@ public GetContainerReportResponse getContainerReport( } return response; } - + /* * (non-Javadoc) - * + * * we're going to fix the issue of showing non-running containers of the * running application in YARN-1794" */ @@ -663,12 +679,17 @@ public SubmitApplicationResponse submitApplication( checkReservationACLs(submissionContext.getQueue(), AuditConstants .SUBMIT_RESERVATION_REQUEST, reservationId); + if (this.contextPreProcessor != null) { + this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(), + applicationId, submissionContext); + } + try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user); - LOG.info("Application with id " + applicationId.getId() + + LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId, callerContext, @@ -843,7 +864,7 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) final Map apps = rmContext.getRMApps(); Iterator appsIter = apps.values().iterator(); - + List reports = new ArrayList(); while (appsIter.hasNext() && reports.size() < limit) { RMApp application = appsIter.next(); @@ -944,7 +965,7 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) @Override public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnException { - GetClusterNodesResponse response = + GetClusterNodesResponse response = recordFactory.newRecordInstance(GetClusterNodesResponse.class); EnumSet nodeStates = request.getNodeStates(); if (nodeStates == null || nodeStates.isEmpty()) { @@ -952,7 +973,7 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) } Collection nodes = RMServerUtils.queryRMNodes(rmContext, nodeStates); - + List nodeReports = new ArrayList(nodes.size()); for (RMNode nodeInfo : nodes) { nodeReports.add(createNodeReports(nodeInfo)); @@ -982,9 +1003,9 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) String.valueOf(request.getIncludeChildQueues())) .append(Keys.RECURSIVE, String.valueOf(request.getRecursive())); try { - QueueInfo queueInfo = - scheduler.getQueueInfo(request.getQueueName(), - request.getIncludeChildQueues(), + QueueInfo queueInfo = + scheduler.getQueueInfo(request.getQueueName(), + request.getIncludeChildQueues(), request.getRecursive()); List appReports = EMPTY_APPS_REPORT; if (request.getIncludeApplications()) { @@ -1002,7 +1023,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) appReports.add( rmApp.createAndGetApplicationReport( callerUGI.getUserName(), true)); - } + } } } queueInfo.setApplications(appReports); @@ -1016,12 +1037,12 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) AuditConstants.GET_QUEUE_INFO_REQUEST, "UNKNOWN", "ClientRMService", ioe.getMessage(), arguments); } - + return response; } private NodeReport createNodeReports(RMNode rmNode) { - SchedulerNodeReport schedulerNodeReport = + SchedulerNodeReport schedulerNodeReport = scheduler.getNodeReport(rmNode.getNodeID()); Resource used = BuilderUtils.newResource(0, 0); int numContainers = 0; @@ -1046,7 +1067,7 @@ private NodeReport createNodeReports(RMNode rmNode) { @Override public GetQueueUserAclsInfoResponse getQueueUserAcls( GetQueueUserAclsInfoRequest request) throws YarnException { - GetQueueUserAclsInfoResponse response = + GetQueueUserAclsInfoResponse response = recordFactory.newRecordInstance(GetQueueUserAclsInfoResponse.class); response.setUserAclsInfoList(scheduler.getQueueUserAclInfo()); return response; @@ -1073,7 +1094,7 @@ public GetDelegationTokenResponse getDelegationToken( realUser = new Text(ugi.getRealUser().getUserName()); } RMDelegationTokenIdentifier tokenIdentifier = - new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), + new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser); Token realRMDToken = new Token(tokenIdentifier, @@ -1099,7 +1120,7 @@ public RenewDelegationTokenResponse renewDelegationToken( throw new IOException( "Delegation Token can be renewed only with kerberos authentication"); } - + org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken(); Token token = new Token( protoToken.getIdentifier().array(), protoToken.getPassword().array(), @@ -1136,7 +1157,7 @@ public CancelDelegationTokenResponse cancelDelegationToken( throw RPCUtil.getRemoteException(e); } } - + @SuppressWarnings("unchecked") @Override public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @@ -1185,7 +1206,7 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( throw ex; } - RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId); return recordFactory .newRecordInstance(MoveApplicationAcrossQueuesResponse.class); @@ -1219,7 +1240,7 @@ private String getRenewerForToken(Token token) : user.getShortUserName(); } - void refreshServiceAcls(Configuration configuration, + void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) { this.server.refreshServiceAclWithLoadedConfiguration(configuration, policyProvider); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java new file mode 100644 index 00000000000..43dde217646 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/SubmissionContextPreProcessor.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SubmissionContextPreProcessor { + + private static Logger LOG = LoggerFactory.getLogger( + SubmissionContextPreProcessor.class); + private static String DEFAULT_COMMANDS = "*"; + + interface ContextProcessor { + void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext); + } + + static class NodeLabelProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + submissionContext.setNodeLabelExpression(value); + } + } + + static class QueueProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + submissionContext.setQueue(value); + } + } + + static class TagAddProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + Set applicationTags = submissionContext.getApplicationTags(); + if (applicationTags == null) { + applicationTags = new HashSet<>(); + } else { + applicationTags = new HashSet<>(applicationTags); + } + applicationTags.add(value); + submissionContext.setApplicationTags(applicationTags); + } + } + + private static final int INITIAL_DELAY = 1000; + + enum ContextProp { + // Node label Expression + NL(new NodeLabelProcessor()), + // Queue + Q(new QueueProcessor()), + // Tag Add + TA(new TagAddProcessor()); + + private ContextProcessor cp; + ContextProp(ContextProcessor cp) { + this.cp = cp; + } + } + + private String hostsFilePath; + private volatile long lastModified = -1; + private volatile Map> hostCommands = + new HashMap<>(); + private ScheduledExecutorService executorService; + + public void start(Configuration conf) { + this.hostsFilePath = + conf.get( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, + YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH); + int refreshPeriod = + conf.getInt( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS, + YarnConfiguration. + DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS); + + LOG.info("Submission Context Preprocessor enabled: file=[{}], interval=[{}]", + this.hostsFilePath, refreshPeriod); + + executorService = Executors.newSingleThreadScheduledExecutor(); + Runnable refreshConf = new Runnable() { + @Override + public void run() { + try { + refresh(); + } catch (Exception ex) { + LOG.error("Error while refreshing Submission PreProcessor file [{}]", hostsFilePath, ex); + } + } + }; + if (refreshPeriod > 0) { + executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY, refreshPeriod, TimeUnit.MILLISECONDS); + } else { + executorService.schedule(refreshConf, INITIAL_DELAY, TimeUnit.MILLISECONDS); + } + } + + public void stop() { + if (this.executorService != null) { + this.executorService.shutdown(); + } + } + + public void preProcess(String host, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + Map cMap = hostCommands.get(host); + + // Try regex match + if (cMap == null) { + for (String key : hostCommands.keySet()) { + if (key.equals(DEFAULT_COMMANDS)) { + continue; + } + try { + Pattern p = Pattern.compile(key); + Matcher m = p.matcher(host); + if (m.find()) { + cMap = hostCommands.get(key); + } + } catch (PatternSyntaxException exception) { + LOG.warn("Invalid regex pattern: " + key); + } + } + } + // Set to default value + if (cMap == null) { + cMap = hostCommands.get(DEFAULT_COMMANDS); + } + if (cMap != null) { + for (Map.Entry entry : cMap.entrySet()) { + entry.getKey().cp.process(host, entry.getValue(), + applicationId, submissionContext); + } + } + } + + @VisibleForTesting + void refresh() throws Exception { + if (null == hostsFilePath || hostsFilePath.isEmpty()) { + LOG.warn("Host list file path [{}] is empty or does not exist !!", + hostsFilePath); + } else { + File hostFile = new File(hostsFilePath); + if (!hostFile.exists() || !hostFile.isFile()) { + LOG.warn("Host list file [{}] does not exist or is not a file !!", + hostFile); + } else if (hostFile.lastModified() <= lastModified) { + LOG.debug("Host list file [{}] has not been modified from last refresh", + hostFile); + } else { + FileInputStream fileInputStream = new FileInputStream(hostFile); + BufferedReader reader = null; + Map> tempHostCommands = new HashMap<>(); + try { + reader = new BufferedReader(new InputStreamReader(fileInputStream, Charsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + // Lines should start with hostname and be followed with commands. + // Delimiter is any contiguous sequence of space or tab character. + // Commands are of the form: + // = + // where KEY can be 'NL', 'Q' or 'TA' (more can be added later) + // (TA stands for 'Tag Add') + // Sample lines: + // ... + // host1 NL=foo Q=b + // host2 Q=c NL=bar + // ... + String[] commands = line.split("[ \t\n\f\r]+"); + if (commands != null && commands.length > 1) { + String host = commands[0].trim(); + if (host.startsWith("#")) { + // All lines starting with # is a comment + continue; + } + Map cMap = null; + for (int i = 1; i < commands.length; i++) { + String[] cSplit = commands[i].split("="); + if (cSplit == null || cSplit.length != 2) { + LOG.error("No commands found for line [{}]", commands[i]); + continue; + } + if (cMap == null) { + cMap = new HashMap<>(); + } + cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]); + } + if (cMap != null && cMap.size() > 0) { + tempHostCommands.put(host, cMap); + LOG.info("Following commands registered for host[{}] : {}", + host, cMap); + } + } + } + lastModified = hostFile.lastModified(); + } catch (Exception ex) { + // Do not commit the new map if we have an Exception.. + tempHostCommands = null; + throw ex; + } finally { + if (tempHostCommands != null && tempHostCommands.size() > 0) { + hostCommands = tempHostCommands; + } + if (reader != null) { + reader.close(); + } + fileInputStream.close(); + } + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 3eed0be25ba..87a521769d9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -31,11 +31,17 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; import java.security.AccessControlException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -51,11 +57,19 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; + import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcConstants; +import org.apache.htrace.core.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; @@ -148,6 +162,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -165,6 +180,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -176,6 +192,8 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.BeforeClass; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -194,11 +212,34 @@ .getRecordFactory(null); private String appType = "MockApp"; - + private static RMDelegationTokenSecretManager dtsm; + private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; private File resourceTypesFile = null; + private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT"; + static { + KerberosName.setRules(kerberosRule); + } + + @BeforeClass + public static void setupSecretManager() throws IOException { + RMContext rmContext = mock(RMContext.class); + YarnConfiguration conf = new YarnConfiguration(); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + when(rmContext.getResourceManager()).thenReturn(new MockRM(conf)); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); + dtsm.startThreads(); + } + + @AfterClass + public static void teardownSecretManager() { + if (dtsm != null) { + dtsm.stopThreads(); + } + } + @Test public void testGetDecommissioningClusterNodes() throws Exception { MockRM rm = new MockRM() { @@ -263,7 +304,7 @@ protected ClientRMService createClientRMService() { labelsMgr.replaceLabelsOnNode(map); rm.sendNodeStarted(node); node.nodeHeartbeat(true); - + // Add and lose a node with label = y MockNM lostNode = rm.registerNode("host2:1235", 1024); rm.sendNodeStarted(lostNode); @@ -288,7 +329,7 @@ protected ClientRMService createClientRMService() { Assert.assertEquals(1, nodeReports.size()); Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); - + // Check node's label = x Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); @@ -302,34 +343,34 @@ protected ClientRMService createClientRMService() { nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals("Unhealthy nodes should not show up by default", 0, nodeReports.size()); - + // Change label of host1 to y map = new HashMap>(); map.put(node.getNodeId(), ImmutableSet.of("y")); labelsMgr.replaceLabelsOnNode(map); - + // Now query for UNHEALTHY nodes request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY)); nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(1, nodeReports.size()); Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); - + Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); - + // Remove labels of host1 map = new HashMap>(); map.put(node.getNodeId(), ImmutableSet.of("y")); labelsMgr.removeLabelsFromNode(map); - + // Query all states should return all nodes rm.registerNode("host3:1236", 1024); request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(3, nodeReports.size()); - + // All host1-3's label should be empty (instead of null) for (NodeReport report : nodeReports) { Assert.assertTrue(report.getNodeLabels() != null @@ -341,7 +382,7 @@ protected ClientRMService createClientRMService() { rpc.stopProxy(client, conf); rm.close(); } - + @Test public void testNonExistingApplicationReport() throws YarnException { RMContext rmContext = mock(RMContext.class); @@ -384,10 +425,10 @@ public void testGetApplicationReport() throws Exception { GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId1); - GetApplicationReportResponse response = + GetApplicationReportResponse response = rmService.getApplicationReport(request); ApplicationReport report = response.getApplicationReport(); - ApplicationResourceUsageReport usageReport = + ApplicationResourceUsageReport usageReport = report.getApplicationResourceUsageReport(); Assert.assertEquals(10, usageReport.getMemorySeconds()); Assert.assertEquals(3, usageReport.getVcoreSeconds()); @@ -434,7 +475,7 @@ public void testGetApplicationReport() throws Exception { rmService.close(); } } - + @Test public void testGetApplicationAttemptReport() throws YarnException, IOException { @@ -468,7 +509,7 @@ public void testGetApplicationResourceUsageReportDummy() throws YarnException, public void handle(Event event) { } }); - ApplicationSubmissionContext asContext = + ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); YarnConfiguration config = new YarnConfiguration(); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, @@ -643,7 +684,7 @@ public void testForceKillApplication() throws Exception { assertEquals("Incorrect number of apps in the RM", 2, rmService.getApplications(getRequest).getApplicationList().size()); } - + @Test (expected = ApplicationNotFoundException.class) public void testMoveAbsentApplication() throws YarnException { RMContext rmContext = mock(RMContext.class); @@ -975,7 +1016,163 @@ public void testGetQueueInfo() throws Exception { Assert.assertEquals(0, applications1.size()); } - + @Test (timeout = 30000) + @SuppressWarnings ("rawtypes") + public void testAppSubmitWithSubmissionPreProcessor() throws Exception { + ResourceScheduler scheduler = mockResourceScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(scheduler, rmContext); + YarnConfiguration yConf = new YarnConfiguration(); + yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, true); + yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + // Override the YARN configuration. + when(rmContext.getYarnConfiguration()).thenReturn(yConf); + RMStateStore stateStore = mock(RMStateStore.class); + when(rmContext.getStateStore()).thenReturn(stateStore); + RMAppManager appManager = new RMAppManager(rmContext, scheduler, + null, mock(ApplicationACLsManager.class), new Configuration()); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler() { + public void handle(Event event) {} + }); + ApplicationId appId1 = getApplicationId(100); + + ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); + when( + mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), + ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); + + QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); + + when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), any(RMApp.class), any(String.class), + any())) + .thenReturn(true); + + ClientRMService rmService = + new ClientRMService(rmContext, scheduler, appManager, + mockAclsManager, mockQueueACLsManager, null); + + + File rulesFile = File.createTempFile("submission_rules", ".tmp"); + rulesFile.deleteOnExit(); + rulesFile.createNewFile(); + + yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, rulesFile.getAbsolutePath()); + rmService.serviceInit(yConf); + rmService.serviceStart(); + + BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile)); + writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:holdem"); + writer.newLine(); + writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:faro"); + writer.newLine(); + writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); + writer.newLine(); + writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); + writer.newLine(); + writer.write("* TA=cluster:other Q=default NL=barfoo"); + writer.newLine(); + writer.flush(); + writer.close(); + rmService.getContextPreProcessor().refresh(); + + setupCurrentCall("host.cluster1.com"); + SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( + appId1, null, null); + try { + rmService.submitApplication(submitRequest1); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app1 = rmContext.getRMApps().get(appId1); + Assert.assertNotNull("app doesn't exist", app1); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); + Assert.assertTrue("custom tag not present", + app1.getApplicationTags().contains("cluster:holdem")); + Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue()); + Assert.assertEquals("app node label doesn't match", + "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression()); + + + setupCurrentCall("host.cluster2.com"); + ApplicationId appId2 = getApplicationId(101); + SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( + appId2, null, null); + submitRequest2.getApplicationSubmissionContext().setApplicationType( + "matchType"); + Set aTags = new HashSet(); + aTags.add("mytag:foo"); + submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags); + try { + rmService.submitApplication(submitRequest2); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app2 = rmContext.getRMApps().get(appId2); + Assert.assertNotNull("app doesn't exist", app2); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName()); + Assert.assertTrue("client tag not present", + app2.getApplicationTags().contains("mytag:foo")); + Assert.assertTrue("custom tag not present", + app2.getApplicationTags().contains("cluster:faro")); + Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue()); + Assert.assertEquals("app node label doesn't match", + "zuess", app2.getApplicationSubmissionContext().getNodeLabelExpression()); + + // Test Default commands + setupCurrentCall("host2.cluster3.com"); + ApplicationId appId3 = getApplicationId(102); + SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest( + appId3, null, null); + submitRequest3.getApplicationSubmissionContext().setApplicationType( + "matchType"); + submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags); + try { + rmService.submitApplication(submitRequest3); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app3 = rmContext.getRMApps().get(appId3); + Assert.assertNotNull("app doesn't exist", app3); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName()); + Assert.assertTrue("client tag not present", + app3.getApplicationTags().contains("mytag:foo")); + Assert.assertTrue("custom tag not present", + app3.getApplicationTags().contains("cluster:other")); + Assert.assertEquals("app queue doesn't match", "default", app3.getQueue()); + Assert.assertEquals("app node label doesn't match", + "barfoo", app3.getApplicationSubmissionContext().getNodeLabelExpression()); + + // Test regex + setupCurrentCall("host.cluster100.com"); + ApplicationId appId4 = getApplicationId(103); + SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest( + appId4, null, null); + try { + rmService.submitApplication(submitRequest4); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app4 = rmContext.getRMApps().get(appId4); + Assert.assertTrue("custom tag not present", + app4.getApplicationTags().contains("cluster:reg")); + Assert.assertEquals("app node label doesn't match", + "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression()); + + } + + private void setupCurrentCall(String hostName) throws UnknownHostException { + Server.Call mockCall = mock(Server.Call.class); + when(mockCall.getHostInetAddress()).thenReturn( + InetAddress.getByAddress(hostName, + new byte[]{123, 123, 123, 123})); + Server.getCurCall().set(mockCall); + } + @Test (timeout = 30000) @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { @@ -1034,6 +1231,9 @@ public void handle(Event event) {} appId2, name, queue); submitRequest2.getApplicationSubmissionContext().setApplicationType( "matchType"); + Set aTags = new HashSet(); + aTags.add("mytag:foo"); + submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags); try { rmService.submitApplication(submitRequest2); } catch (YarnException e) { @@ -1123,7 +1323,7 @@ public void handle(Event event) {} ApplicationId[] appIds = {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; List tags = Arrays.asList("Tag1", "Tag2", "Tag3"); - + long[] submitTimeMillis = new long[3]; // Submit applications for (int i = 0; i < appIds.length; i++) { @@ -1150,23 +1350,23 @@ public void handle(Event event) {} request.setLimit(1L); assertEquals("Failed to limit applications", 1, rmService.getApplications(request).getApplicationList().size()); - + // Check start range request = GetApplicationsRequest.newInstance(); request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis()); - + // 2 applications are submitted after first timeMills - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 2, rmService.getApplications(request).getApplicationList().size()); - + // 1 application is submitted after the second timeMills request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis()); - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 1, rmService.getApplications(request).getApplicationList().size()); - + // no application is submitted after the third timeMills request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis()); - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 0, rmService.getApplications(request).getApplicationList().size()); // Check queue @@ -1238,7 +1438,7 @@ public void handle(Event event) {} assertEquals("Incorrect number of applications for the scope", 3, rmService.getApplications(request).getApplicationList().size()); } - + @Test(timeout=4000) public void testConcurrentAppSubmit() throws IOException, InterruptedException, BrokenBarrierException, @@ -1257,7 +1457,7 @@ public void testConcurrentAppSubmit() appId1, null, null); final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( appId2, null, null); - + final CyclicBarrier startBarrier = new CyclicBarrier(2); final CyclicBarrier endBarrier = new CyclicBarrier(2); @@ -1299,7 +1499,7 @@ public void run() { } }; t.start(); - + // submit another app, so go through while the first app is blocked startBarrier.await(); rmService.submitApplication(submitRequest2); @@ -1385,7 +1585,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) private ConcurrentHashMap getRMApps( RMContext rmContext, YarnScheduler yarnScheduler) { - ConcurrentHashMap apps = + ConcurrentHashMap apps = new ConcurrentHashMap(); ApplicationId applicationId1 = getApplicationId(1); ApplicationId applicationId2 = getApplicationId(2); @@ -1399,7 +1599,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) config, "testqueue", 40, 5,"high-mem","high-mem")); return apps; } - + private List getSchedulerApps( Map apps) { List schedApps = new ArrayList(); @@ -1412,7 +1612,7 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) private static ApplicationId getApplicationId(int id) { return ApplicationId.newInstance(123456, id); } - + private static ApplicationAttemptId getApplicationAttemptId(int id) { return ApplicationAttemptId.newInstance(getApplicationId(id), 1); } @@ -1437,7 +1637,7 @@ public ApplicationReport createAndGetApplicationReport( String clientUserName, boolean allowAccess) { ApplicationReport report = super.createAndGetApplicationReport( clientUserName, allowAccess); - ApplicationResourceUsageReport usageReport = + ApplicationResourceUsageReport usageReport = report.getApplicationResourceUsageReport(); usageReport.setMemorySeconds(memorySeconds); usageReport.setVcoreSeconds(vcoreSeconds); @@ -1457,7 +1657,7 @@ public ApplicationReport createAndGetApplicationReport( RMContainerImpl containerimpl = spy(new RMContainerImpl(container, SchedulerRequestKey.extractFrom(container), attemptId, null, "", rmContext)); - Map attempts = + Map attempts = new HashMap(); attempts.put(attemptId, rmAppAttemptImpl); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);