From a950e9bdb02dfcaa28304a26985e3a3799ab5fdb Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 27 Mar 2018 12:29:25 +0530 Subject: [PATCH] YARN-8048 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 8 + .../yarn/conf/TestYarnConfigurationFields.java | 3 + .../hadoop-yarn-services-api/pom.xml | 4 + .../yarn/service/client/SystemServiceManager.java | 312 +++++++++++++++++++++ .../yarn/service/client/TestSystemServiceImpl.java | 177 ++++++++++++ .../test/resources/users/user1/example-app1.json | 16 ++ .../test/resources/users/user1/example-app2.json | 16 ++ .../test/resources/users/user2/example-app1.json | 16 ++ .../test/resources/users/user2/example-app2.json | 16 ++ .../hadoop/yarn/service/conf/YarnServiceConf.java | 5 + .../hadoop/yarn/server/service/ServiceManager.java | 24 ++ .../hadoop/yarn/server/service/package-info.java | 27 ++ .../server/resourcemanager/ResourceManager.java | 30 +- 13 files changed, 653 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04b28985c71..8e99d245728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -343,6 +343,14 @@ private static void addDeprecatedKeys() { public static final String YARN_API_SERVICES_ENABLE = "yarn." + "webapp.api-service.enable"; + @Private + public static final String YARN_API_SYSTEM_SERVICES_CLASS = + "yarn.service.system-service-manager.class"; + + @Private + public static final String DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS = + "org.apache.hadoop.yarn.service.client.SystemServiceManager"; + public static final String RM_RESOURCE_TRACKER_ADDRESS = RM_PREFIX + "resource-tracker.address"; public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 9fe4f884899..428bb9dbb8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -147,6 +147,9 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_SOURCE_FILE); + configurationPropsToSkipCompare + .add(YarnConfiguration.YARN_API_SYSTEM_SERVICES_CLASS); + // Ignore NodeManager "work in progress" variables configurationPrefixToSkipCompare .add(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml index 7fe2ef6fc10..49336873603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml @@ -96,6 +96,10 @@ org.apache.hadoop + hadoop-yarn-server-common + + + org.apache.hadoop hadoop-common diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java new file mode 100644 index 00000000000..1765991f0d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.service.ServiceManager; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; + +/** + * ServiceManager implementation. + */ +public class SystemServiceManager extends AbstractService implements + ServiceManager { + + private static final Logger LOG = LoggerFactory + .getLogger(SystemServiceManager.class); + + private static final FsPermission SYSTEM_SERVICE_DIR_PERMISSION = + new FsPermission((short) 01777); + + private static final Joiner JOINER = Joiner.on(":"); + private FileSystem fs; + private Path systemServiceDir; + private AtomicBoolean stopExecutors = new AtomicBoolean(false); + private Map> userServices =new HashMap<>(); + @VisibleForTesting + private Map ignoredUserServices =new HashMap<>(); + private UserGroupInformation loginUGI; + private Thread serviceLaucher; + + + public SystemServiceManager() { + super(SystemServiceManager.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + systemServiceDir = new Path( + conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY, + YarnServiceConf.DEFAULT_YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY)); + LOG.info("System Service Directory is configured to {}", systemServiceDir); + fs = systemServiceDir.getFileSystem(conf); + + this.loginUGI = UserGroupInformation.isSecurityEnabled() ? + UserGroupInformation.getLoginUser() : + UserGroupInformation.getCurrentUser(); + LOG.info("UserGroupInformation initialized to {}", loginUGI); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (!fs.exists(systemServiceDir)) { + fs.mkdirs(systemServiceDir); + fs.setPermission(systemServiceDir, SYSTEM_SERVICE_DIR_PERMISSION); + } + // Create a thread and submit services in background otherwise it + // block RM switch time. + serviceLaucher = new Thread(createRunnable()); + serviceLaucher.setName("System service launcher"); + serviceLaucher.start(); + } + + private Runnable createRunnable() { + return new Runnable() { + @Override + public void run() { + try { + scanSystemDirectoryForUsers(); + launchUserService(); + } catch (Exception e) { + LOG.error("Error while launching services.", e); + } + } + }; + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping {}", getName()); + stopExecutors.set(true); + if (serviceLaucher != null) { + serviceLaucher.interrupt(); + } + } + + void launchUserService() { + for (Map.Entry> entry : userServices.entrySet()) { + String user = entry.getKey(); + Set services = entry.getValue(); + if (services.isEmpty()) { + continue; + } + ServiceClient serviceClient = null; + try { + UserGroupInformation userUgi = getProxyUser(user); + serviceClient = createServiceClient(userUgi); + for (Service service : services) { + LOG.info("POST: createService = {} user = {}", service, userUgi); + try { + launchServices(userUgi, serviceClient, service); + } catch (IOException | UndeclaredThrowableException e) { + String message = + "Failed to create service " + service.getName() + ": {}"; + LOG.error(message, e); + } + } + } catch (Exception e) { + LOG.error("Error while submitting services for user " + user, e); + } finally { + if (serviceClient != null) { + try { + serviceClient.close(); + } catch (IOException e) { + LOG.warn("Error while closing serviceClient for user {}", user); + } + } + } + } + } + + private ServiceClient createServiceClient(UserGroupInformation userUgi) + throws IOException, InterruptedException { + ServiceClient serviceClient = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public ServiceClient run() + throws IOException, YarnException { + ServiceClient sc = getServiceClient(); + sc.init(getConfig()); + sc.start(); + return sc; + } + }); + return serviceClient; + } + + private void launchServices(UserGroupInformation userUgi, + ServiceClient serviceClient, Service service) + throws IOException, InterruptedException { + if (service.getState() == ServiceState.STOPPED) { + + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException, YarnException { + serviceClient.actionBuild(service); + return null; + } + }); + LOG.info("Service {} version {} saved.", service.getName(), + service.getVersion()); + } else { + ApplicationId applicationId = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationId run() + throws IOException, YarnException { + ApplicationId applicationId = serviceClient.actionCreate(service); + return applicationId; + } + }); + LOG.info("Service {} submitted with Application ID: {}", + service.getName(), applicationId); + } + } + + ServiceClient getServiceClient() { + return new ServiceClient(); + } + + private UserGroupInformation getProxyUser(String user) { + UserGroupInformation ugi; + if (UserGroupInformation.isSecurityEnabled()) { + ugi = UserGroupInformation.createProxyUser(user, loginUGI); + } else { + ugi = UserGroupInformation.createRemoteUser(user); + } + return ugi; + } + + void scanSystemDirectoryForUsers() throws IOException { + scanForUserServiceDefinition(systemServiceDir, false); + } + + // Files are under systemServiceDir/. Scan for 2 levels + // 1st level for users + // 2nd level for service definitions under user + void scanForUserServiceDefinition(Path dir, boolean isScanningUserDir) + throws IOException { + RemoteIterator iter = list(dir); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + if (!isScanningUserDir) { + if (stat.isDirectory()) { + String userName = stat.getPath().getName(); + LOG.info("Scanning service definitions for user {} ", userName); + scanForUserServiceDefinition(stat.getPath(), true); + } else { + LOG.info( + "Service definition {} doesn't belong to any user. Ignoring.. ", + stat.getPath().getName()); + } + } else { + String userName=dir.getName(); + Service service = getServiceDefinition(stat.getPath()); + if (service != null) { + Set services = userServices.get(userName); + if (services == null) { + services = new HashSet<>(); + userServices.put(userName, services); + } + if (!services.add(service)) { + int count = ignoredUserServices.containsKey(userName) ? + ignoredUserServices.get(userName) : 0; + ignoredUserServices.put(userName, count + 1); + LOG.warn( + "Ignoring service {} for the user {} as it is already present.", + service.getName(), dir.getName()); + } + LOG.info("Adding service {} for the user {}", service.getName(), + userName); + } + } + } + } + + private Service getServiceDefinition(Path filePath) { + Service service = null; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Loading service definition from FS: " + filePath); + } + service = jsonSerDeser.load(fs, filePath); + } catch (IOException e) { + LOG.info("Error while loading service definition from FS: {}", e); + } + return service; + } + + + private RemoteIterator list(Path path) throws IOException { + return new StoppableRemoteIterator(fs.listStatusIterator(path)); + } + + @VisibleForTesting + Map getIgnoredUserServices() { + return ignoredUserServices; + } + + private class StoppableRemoteIterator implements RemoteIterator { + private final RemoteIterator remote; + + StoppableRemoteIterator(RemoteIterator remote) { + this.remote = remote; + } + + @Override + public boolean hasNext() throws IOException { + return !stopExecutors.get() && remote.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return remote.next(); + } + } + + @VisibleForTesting + Map> getUserServices() { + return userServices; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java new file mode 100644 index 00000000000..b043dae948e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * Test class for system service manager. + */ +public class TestSystemServiceImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSystemServiceImpl.class); + private SystemServiceManager systemService; + private Configuration conf; + private String resourcePath = "users"; + private String address = "localhost:9191"; + + private String[] users = new String[] {"user1", "user2"}; + private static Map> loadedServices = new HashMap<>(); + private static Map> submittedServices = new HashMap<>(); + + @Before + public void setup() { + File file = new File( + getClass().getClassLoader().getResource(resourcePath).getFile()); + conf = new Configuration(); + conf.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY, + file.getAbsolutePath()); + systemService = new SystemServiceManager() { + @Override + ServiceClient getServiceClient() { + return new TestServiceClient(); + } + }; + systemService.init(conf); // do not call explicit start + + constructUserService(users[0], "example-app1"); + constructUserService(users[1], "example-app1", "example-app2"); + } + + @After + public void teadDown() { + systemService.stop(); + } + + @Test + public void testSystemServiceSubmission() throws Exception { + systemService.scanSystemDirectoryForUsers(); + Map> userServices = systemService.getUserServices(); + Assert.assertEquals(loadedServices.size(), userServices.size()); + verifyUserServices(userServices); + /* verify for ignored sevices count */ + Map ignoredUserServices = + systemService.getIgnoredUserServices(); + Assert.assertEquals(1, ignoredUserServices.size()); + Assert.assertTrue("User user1 doesn't exist.", + ignoredUserServices.containsKey(users[0])); + int count = ignoredUserServices.get(users[0]); + Assert.assertEquals(1, count); + + systemService.launchUserService(); + verifyUserServices(); + + // 2nd time launch service to handle if service exist scenario + systemService.launchUserService(); + verifyUserServices(); + } + + private void verifyUserServices(Map> userServices) { + for (String user : users) { + Set services = userServices.get(user); + Set serviceNames = loadedServices.get(user); + Assert.assertEquals(serviceNames.size(), services.size()); + Iterator iterator = services.iterator(); + while (iterator.hasNext()) { + Service next = iterator.next(); + Assert.assertTrue("Service name doesn't exist in expected userService " + + serviceNames, serviceNames.contains(next.getName())); + } + } + } + + public void constructUserService(String user, String... serviceNames) { + Set service = loadedServices.get(user); + if (service == null) { + service = new HashSet<>(); + for (String serviceName : serviceNames) { + service.add(serviceName); + } + loadedServices.put(user, service); + } + } + + class TestServiceClient extends ServiceClient { + @Override + protected void serviceStart() throws Exception { + // do nothing + } + + @Override + protected void serviceStop() throws Exception { + // do nothing + } + + @Override + protected void serviceInit(Configuration configuration) + throws Exception { + // do nothing + } + + @Override + public ApplicationId actionCreate(Service service) + throws YarnException, IOException { + String userName = + UserGroupInformation.getCurrentUser().getShortUserName(); + Set services = submittedServices.get(userName); + if (services == null) { + services = new HashSet<>(); + submittedServices.put(userName, services); + } + if (services.contains(service.getName())) { + String message = "Failed to create service " + service.getName() + + ", because it already exists."; + throw new YarnException(message); + } + services.add(service.getName()); + return ApplicationId.newInstance(System.currentTimeMillis(), 1); + } + } + + private void verifyUserServices() { + Assert.assertEquals(loadedServices.size(), submittedServices.size()); + for (Map.Entry> entry : submittedServices.entrySet()) { + String user = entry.getKey(); + Set serviceSet = entry.getValue(); + Assert.assertTrue(loadedServices.containsKey(user)); + Set services = loadedServices.get(user); + Assert.assertEquals(services.size(), serviceSet.size()); + Assert.assertTrue(services.containsAll(serviceSet)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json new file mode 100644 index 00000000000..d8fd1d12fca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json @@ -0,0 +1,16 @@ +{ + "name": "example-app2", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java index b9a759438e0..edf18a1dba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -50,6 +50,11 @@ public static final String ROLLING_LOG_INCLUSION_PATTERN = "yarn.service.rolling-log.include-pattern"; public static final String ROLLING_LOG_EXCLUSION_PATTERN = "yarn.service.rolling-log.exclude-pattern"; + public static final String YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY = + YARN_SERVICE_PREFIX + "system-service.directory"; + + public static final String DEFAULT_YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY = + "/tmp/yarn-system-service"; /** * The yarn service base path: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java new file mode 100644 index 00000000000..301efd54a26 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.service; + +/** + * Marker interface for starting services from RM. + */ +public interface ServiceManager { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java new file mode 100644 index 00000000000..c448bab134d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.service contains service related + * classes. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable + +package org.apache.hadoop.yarn.server.service; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 38da7f5e5c3..7efa6f0e08f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.service.ServiceManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -317,7 +318,7 @@ protected void serviceInit(Configuration conf) throws Exception { } rmContext.setYarnConfiguration(conf); - + createAndInitActiveServices(false); webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, @@ -493,6 +494,27 @@ protected ReservationSystem createReservationSystem() { } } + protected ServiceManager createServiceManager() { + String schedulerClassName = + conf.get(YarnConfiguration.YARN_API_SYSTEM_SERVICES_CLASS, + YarnConfiguration.DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS); + LOG.info("Using ServiceManager: " + schedulerClassName); + try { + Class schedulerClazz = Class.forName(schedulerClassName); + if (ServiceManager.class.isAssignableFrom(schedulerClazz)) { + return (ServiceManager) ReflectionUtils + .newInstance(schedulerClazz, this.conf); + } else { + throw new YarnRuntimeException( + "Class: " + schedulerClassName + " not instance of " + + ServiceManager.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ServiceManager: " + schedulerClassName, e); + } + } + protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(this.rmContext); } @@ -796,6 +818,12 @@ protected void serviceInit(Configuration configuration) throws Exception { new RMNMInfo(rmContext, scheduler); + if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, + false)) { + ServiceManager serviceManager = createServiceManager(); + addIfService(serviceManager); + } + super.serviceInit(conf); } -- 2.13.6 (Apple Git-96)