From 9e642d6196be98f2b60bba607dfbc36aad33df1c Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Wed, 21 Mar 2018 23:13:08 +0530 Subject: [PATCH] YARN-8048 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 8 + .../yarn/conf/TestYarnConfigurationFields.java | 3 + .../hadoop-yarn-services-api/pom.xml | 4 + .../yarn/service/client/ApiServiceClient.java | 100 +------- .../yarn/service/client/ServiceConnector.java | 202 ++++++++++++++++ .../yarn/service/client/ServiceWebClient.java | 151 ++++++++++++ .../yarn/service/client/SystemServiceManager.java | 257 +++++++++++++++++++++ .../hadoop/yarn/service/utils/ApiServiceUtils.java | 120 ++++++++++ .../hadoop/yarn/service/utils/package-info.java | 27 +++ .../yarn/service/webapp/ApiServerWebApp.java | 9 + .../yarn/service/client/TestSystemServiceImpl.java | 190 +++++++++++++++ .../test/resources/users/user1/example-app1.json | 16 ++ .../test/resources/users/user1/example-app2.json | 16 ++ .../test/resources/users/user2/example-app1.json | 16 ++ .../test/resources/users/user2/example-app2.json | 16 ++ .../hadoop/yarn/service/conf/YarnServiceConf.java | 5 + .../hadoop/yarn/server/service/ServiceManager.java | 25 ++ .../hadoop/yarn/server/service/package-info.java | 27 +++ .../server/resourcemanager/ResourceManager.java | 33 ++- 19 files changed, 1132 insertions(+), 93 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceConnector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceWebClient.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/ApiServiceUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04b28985c71..8e99d245728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -343,6 +343,14 @@ private static void addDeprecatedKeys() { public static final String YARN_API_SERVICES_ENABLE = "yarn." + "webapp.api-service.enable"; + @Private + public static final String YARN_API_SYSTEM_SERVICES_CLASS = + "yarn.service.system-service-manager.class"; + + @Private + public static final String DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS = + "org.apache.hadoop.yarn.service.client.SystemServiceManager"; + public static final String RM_RESOURCE_TRACKER_ADDRESS = RM_PREFIX + "resource-tracker.address"; public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 9fe4f884899..428bb9dbb8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -147,6 +147,9 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_SOURCE_FILE); + configurationPropsToSkipCompare + .add(YarnConfiguration.YARN_API_SYSTEM_SERVICES_CLASS); + // Ignore NodeManager "work in progress" variables configurationPrefixToSkipCompare .add(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml index 7fe2ef6fc10..49336873603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml @@ -96,6 +96,10 @@ org.apache.hadoop + hadoop-yarn-server-common + + + org.apache.hadoop hadoop-common diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 49702e3455c..6a8a13ac27f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -30,28 +30,24 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; -import org.apache.hadoop.yarn.service.api.records.ServiceStatus; +import org.apache.hadoop.yarn.service.utils.ApiServiceUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; -import org.apache.hadoop.yarn.util.RMHAUtils; import org.eclipse.jetty.util.UrlEncoded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource.Builder; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; @@ -73,56 +69,7 @@ super.serviceInit(configuration); } - /** - * Calculate Resource Manager address base on working REST API. - */ - private String getRMWebAddress() { - Configuration conf = getConfig(); - String scheme = "http://"; - String path = "/app/v1/services/version"; - String rmAddress = conf - .get("yarn.resourcemanager.webapp.address"); - if (YarnConfiguration.useHttps(conf)) { - scheme = "https://"; - rmAddress = conf - .get("yarn.resourcemanager.webapp.https.address"); - } - boolean useKerberos = UserGroupInformation.isSecurityEnabled(); - List rmServers = RMHAUtils - .getRMHAWebappAddresses(new YarnConfiguration(conf)); - for (String host : rmServers) { - try { - Client client = Client.create(); - StringBuilder sb = new StringBuilder(); - sb.append(scheme); - sb.append(host); - sb.append(path); - if (!useKerberos) { - try { - String username = UserGroupInformation.getCurrentUser().getShortUserName(); - sb.append("?user.name="); - sb.append(username); - } catch (IOException e) { - LOG.debug("Fail to resolve username: {}", e); - } - } - WebResource webResource = client - .resource(sb.toString()); - if (useKerberos) { - AuthenticatedURL.Token token = new AuthenticatedURL.Token(); - webResource.header("WWW-Authenticate", token); - } - ClientResponse test = webResource.get(ClientResponse.class); - if (test.getStatus() == 200) { - rmAddress = host; - break; - } - } catch (Exception e) { - LOG.debug("Fail to connect to: "+host, e); - } - } - return scheme+rmAddress; - } + /** * Compute active resource manager API service location. @@ -132,7 +79,7 @@ private String getRMWebAddress() { * @throws IOException */ private String getApiUrl(String appName) throws IOException { - String url = getRMWebAddress(); + String url = ApiServiceUtils.getRMWebAddress(getConfig()); StringBuilder api = new StringBuilder(); api.append(url); api.append("/app/v1/services"); @@ -182,35 +129,6 @@ private ClientConfig getClientConfig() { return config; } - private int processResponse(ClientResponse response) { - response.bufferEntity(); - String output; - if (response.getStatus() == 401) { - LOG.error("Authentication required"); - return EXIT_EXCEPTION_THROWN; - } - if (response.getStatus() == 503) { - LOG.error("YARN Service is unavailable or disabled."); - return EXIT_EXCEPTION_THROWN; - } - try { - ServiceStatus ss = response.getEntity(ServiceStatus.class); - output = ss.getDiagnostics(); - } catch (Throwable t) { - output = response.getEntity(String.class); - } - if (output==null) { - output = response.getEntity(String.class); - } - if (response.getStatus() <= 299) { - LOG.info(output); - return EXIT_SUCCESS; - } else { - LOG.error(output); - return EXIT_EXCEPTION_THROWN; - } - } - /** * Utility method to load Service json from disk or from * YARN examples. @@ -291,7 +209,7 @@ public int actionLaunch(String fileName, String appName, Long lifetime, String buffer = jsonSerDeser.toJson(service); ClientResponse response = getApiClient() .post(ClientResponse.class, buffer); - result = processResponse(response); + result = ApiServiceUtils.processResponse(response); } catch (Exception e) { LOG.error("Fail to launch application: ", e); result = EXIT_EXCEPTION_THROWN; @@ -314,7 +232,7 @@ public int actionStop(String appName) throws IOException, YarnException { String buffer = jsonSerDeser.toJson(service); ClientResponse response = getApiClient(appName) .put(ClientResponse.class, buffer); - result = processResponse(response); + result = ApiServiceUtils.processResponse(response); } catch (Exception e) { LOG.error("Fail to stop application: ", e); result = EXIT_EXCEPTION_THROWN; @@ -337,7 +255,7 @@ public int actionStart(String appName) throws IOException, YarnException { String buffer = jsonSerDeser.toJson(service); ClientResponse response = getApiClient(appName) .put(ClientResponse.class, buffer); - result = processResponse(response); + result = ApiServiceUtils.processResponse(response); } catch (Exception e) { LOG.error("Fail to start application: ", e); result = EXIT_EXCEPTION_THROWN; @@ -364,7 +282,7 @@ public int actionSave(String fileName, String appName, Long lifetime, String buffer = jsonSerDeser.toJson(service); ClientResponse response = getApiClient() .post(ClientResponse.class, buffer); - result = processResponse(response); + result = ApiServiceUtils.processResponse(response); } catch (Exception e) { LOG.error("Fail to save application: ", e); result = EXIT_EXCEPTION_THROWN; @@ -383,7 +301,7 @@ public int actionDestroy(String appName) throws IOException, YarnException { try { ClientResponse response = getApiClient(appName) .delete(ClientResponse.class); - result = processResponse(response); + result = ApiServiceUtils.processResponse(response); } catch (Exception e) { LOG.error("Fail to destroy application: ", e); result = EXIT_EXCEPTION_THROWN; @@ -415,7 +333,7 @@ public int actionFlex(String appName, Map componentCounts) String buffer = jsonSerDeser.toJson(service); ClientResponse response = getApiClient(appName) .put(ClientResponse.class, buffer); - result = processResponse(response); + result = ApiServiceUtils.processResponse(response); } catch (Exception e) { LOG.error("Fail to flex application: ", e); result = EXIT_EXCEPTION_THROWN; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceConnector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceConnector.java new file mode 100644 index 00000000000..a56f1e3b4f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceConnector.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.client; + +import com.google.common.base.Joiner; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.client.api.impl.TimelineConnector; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.net.URLConnection; +import java.security.GeneralSecurityException; + +public class ServiceConnector extends AbstractService { + private static final Joiner JOINER = Joiner.on(""); + private static final Logger LOG = + LoggerFactory.getLogger(TimelineConnector.class); + public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute + + private SSLFactory sslFactory; + private Client client; + private ConnectionConfigurator connConfigurator; + private DelegationTokenAuthenticator authenticator; + private DelegationTokenAuthenticatedURL.Token token; + private UserGroupInformation authUgi; + private String doAsUser; + + public ServiceConnector(UserGroupInformation authUgi, String doAsUser, + DelegationTokenAuthenticatedURL.Token token) { + super("ServiceConnector"); + this.authUgi = authUgi; + this.doAsUser = doAsUser; + this.token = token; + } + + protected void serviceInit(Configuration conf) throws Exception { + ClientConfig cc = new DefaultClientConfig(); + cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); + + if (YarnConfiguration.useHttps(conf)) { + // If https is chosen, configures SSL client. + sslFactory = getSSLFactory(conf); + connConfigurator = getConnConfigurator(sslFactory); + } else { + connConfigurator = DEFAULT_TIMEOUT_CONN_CONFIGURATOR; + } + + if (UserGroupInformation.isSecurityEnabled()) { + authenticator = new KerberosDelegationTokenAuthenticator(); + } else { + authenticator = new PseudoDelegationTokenAuthenticator(); + } + authenticator.setConnectionConfigurator(connConfigurator); + + client = new Client(new URLConnectionClientHandler( + new ServiceConnector.ServiceClientURLConnectionFactory(authUgi, + authenticator, connConfigurator, token, doAsUser)), cc); + } + + protected void serviceStop() { + if (this.sslFactory != null) { + this.sslFactory.destroy(); + } + } + + public Client getClient() { + return client; + } + + protected SSLFactory getSSLFactory(Configuration conf) + throws GeneralSecurityException, IOException { + SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); + newSSLFactory.init(); + return newSSLFactory; + } + + private static final ConnectionConfigurator + DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); + return conn; + } + }; + + private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) { + try { + return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj); + } catch (Exception e) { + LOG.debug("Cannot load customized ssl related configuration. " + + "Fallback to system-generic settings.", e); + return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; + } + } + + private static void setTimeouts(URLConnection connection, int socketTimeout) { + connection.setConnectTimeout(socketTimeout); + connection.setReadTimeout(socketTimeout); + } + + public static URI constructResURI(Configuration conf, String address, + String uri) { + return URI.create(JOINER + .join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", + address, uri)); + } + + private static ConnectionConfigurator initSslConnConfigurator( + final int timeout, SSLFactory sslFactory) + throws IOException, GeneralSecurityException { + final SSLSocketFactory sf; + final HostnameVerifier hv; + + sf = sslFactory.createSSLSocketFactory(); + hv = sslFactory.getHostnameVerifier(); + + return new ConnectionConfigurator() { + @Override public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection c = (HttpsURLConnection) conn; + c.setSSLSocketFactory(sf); + c.setHostnameVerifier(hv); + } + setTimeouts(conn, timeout); + return conn; + } + }; + } + + private static class ServiceClientURLConnectionFactory + implements HttpURLConnectionFactory { + private DelegationTokenAuthenticator authenticator; + private UserGroupInformation authUgi; + private ConnectionConfigurator connConfigurator; + private DelegationTokenAuthenticatedURL.Token token; + private String doAsUser; + + ServiceClientURLConnectionFactory(UserGroupInformation authUgi, + DelegationTokenAuthenticator authenticator, + ConnectionConfigurator connConfigurator, + DelegationTokenAuthenticatedURL.Token token, String doAsUser) { + this.authUgi = authUgi; + this.authenticator = authenticator; + this.connConfigurator = connConfigurator; + this.token = token; + this.doAsUser = doAsUser; + } + + @Override public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + authUgi.checkTGTAndReloginFromKeytab(); + try { + return new DelegationTokenAuthenticatedURL(authenticator, + connConfigurator).openConnection(url, token, doAsUser); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (AuthenticationException ae) { + throw new IOException(ae); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceWebClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceWebClient.java new file mode 100644 index 00000000000..f523051e289 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ServiceWebClient.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.client; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.conf.RestApiConstants; +import org.apache.hadoop.yarn.service.utils.ApiServiceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.Set; + +/** + * Service web client for launching services. + */ +public class ServiceWebClient extends CompositeService { + private static final Logger LOG = LoggerFactory + .getLogger(ServiceWebClient.class); + + private static final String RESOURCE_URI_STR = "/app/v1"; + + private ServiceConnector connector; + + private UserGroupInformation authUgi; + + private String address; + @VisibleForTesting + String userName; + + ServiceWebClient(String address) { + super(ServiceClient.class.getName()); + this.address = address; + } + + protected void serviceInit(Configuration conf) throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUgi = ugi.getRealUser(); + String doAsUser = null; + if (realUgi != null) { + authUgi = realUgi; + doAsUser = ugi.getShortUserName(); + userName = doAsUser; + } else { + authUgi = ugi; + doAsUser = null; + userName = authUgi.getShortUserName(); + } + + LOG.info("UserGroupInformation initialized: authUgi={}, doAsUser={}, " + + "userName={} address={}", authUgi, doAsUser, userName, address); + + DelegationTokenAuthenticatedURL.Token token = + new DelegationTokenAuthenticatedURL.Token(); + connector = new ServiceConnector(authUgi, doAsUser, token); + addIfService(connector); + super.serviceInit(conf); + } + + public void launch(Set services) { + for (Service service : services) { + LOG.info("Submitting service {} for the user {}", service.getName(), + userName); + launchService(service); + } + } + + private void launchService(Service service) { + try { + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("user.name", userName); + putObjects(RestApiConstants.SERVICE_ROOT_PATH, params, service); + } catch (Exception e) { + LOG.error("Error while submitting service {} for user {}", + service.getName(), userName); + LOG.error("Exception is ", e); + } + } + + ClientResponse doPutObjects(URI base, String path, + MultivaluedMap params, Object obj) { + return connector.getClient().resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, obj); + } + + private void putObjects(String path, MultivaluedMap params, + Object obj) throws IOException, YarnException { + URI uri = ServiceConnector + .constructResURI(getConfig(), address, RESOURCE_URI_STR); + putObjects(uri, path, params, obj); + } + + private void putObjects(URI base, String path, + MultivaluedMap params, Object obj) + throws IOException, YarnException { + ClientResponse resp = null; + try { + resp = authUgi.doAs(new PrivilegedExceptionAction() { + @Override + public ClientResponse run() throws Exception { + return doPutObjects(base, path, params, obj); + } + }); + } catch (UndeclaredThrowableException ue) { + Throwable cause = ue.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } else { + throw new IOException(cause); + } + } catch (InterruptedException ie) { + throw (IOException) new InterruptedIOException().initCause(ie); + } + if (resp == null) { + String msg = "Response from the api server is null"; + LOG.error(msg); + throw new YarnException(msg); + } else { + ApiServiceUtils.processResponse(resp); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java new file mode 100644 index 00000000000..c92428eadae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManager.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.service.ServiceManager; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; + +public class SystemServiceManager extends AbstractService implements + ServiceManager { + + private static final Logger LOG = LoggerFactory + .getLogger(SystemServiceManager.class); + + private static final FsPermission SYSTEM_SERVICE_DIR_PERMISSION = + new FsPermission((short) 01777); + + private static final Joiner JOINER = Joiner.on(":"); + private FileSystem fs; + private Path systemServiceDir; + private String address; + private AtomicBoolean stopExecutors = new AtomicBoolean(false); + private Map> userServices =new HashMap<>(); + @VisibleForTesting + private Map ignoredUserServices =new HashMap<>(); + private UserGroupInformation loginUGI; + private Thread serviceLaucher; + + + public SystemServiceManager(String apiServiceHostName, int apiServicePort) { + super(SystemServiceManager.class.getName()); + this.address = JOINER.join(apiServiceHostName, apiServicePort); + } + + public SystemServiceManager() { + super(SystemServiceManager.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + systemServiceDir = new Path( + conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY, + YarnServiceConf.DEFAULT_YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY)); + LOG.info("System Service Directory is configured to {}", systemServiceDir); + fs = systemServiceDir.getFileSystem(conf); + + this.loginUGI = UserGroupInformation.isSecurityEnabled() ? + UserGroupInformation.getLoginUser() : + UserGroupInformation.getCurrentUser(); + LOG.info("UserGroupInformation initialized to {}", loginUGI); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (!fs.exists(systemServiceDir)) { + fs.mkdirs(systemServiceDir); + fs.setPermission(systemServiceDir, SYSTEM_SERVICE_DIR_PERMISSION); + } + // Create a thread and submit services in background otherwise it + // block RM switch time. + serviceLaucher = new Thread(createRunnable()); + serviceLaucher.setName("System service launcher"); + serviceLaucher.start(); + } + + private Runnable createRunnable() { + return new Runnable() { + @Override public void run() { + try { + scanSystemDirectoryForUsers(); + launchUserService(); + } catch (Exception e) { + LOG.error("Error while launching services. {}", e); + } + } + }; + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping {}", getName()); + stopExecutors.set(true); + if (serviceLaucher != null) { + serviceLaucher.interrupt(); + } + } + + void launchUserService() { + for (Map.Entry> entry : userServices.entrySet()) { + String user = entry.getKey(); + Set services = entry.getValue(); + if (services.isEmpty()) { + continue; + } + UserGroupInformation userUgi = + UserGroupInformation.createProxyUser(user, loginUGI); + ServiceWebClient serviceWebClient = null; + try { + serviceWebClient = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override public ServiceWebClient run() throws Exception { + return createServiceWebClient(); + } + }); + } catch (IOException | InterruptedException | RuntimeException | Error e) { + LOG.warn( + "Unable to create service web client for user {}. Exception is {}", + user, e); + } + + // Submit and return. + if (serviceWebClient != null) { + serviceWebClient.launch(services); + } + } + } + + @VisibleForTesting + ServiceWebClient createServiceWebClient() { + ServiceWebClient serviceClient = new ServiceWebClient(address); + serviceClient.init(getConfig()); + serviceClient.start(); + return serviceClient; + } + + void scanSystemDirectoryForUsers() throws IOException { + scanForUserServiceDefinition(systemServiceDir, false); + } + + // Files are under systemServiceDir/. Scan for 2 levels + // 1st level for users + // 2nd level for service definitions under user + void scanForUserServiceDefinition(Path dir, boolean isScanningUserDir) + throws IOException { + RemoteIterator iter = list(dir); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + if (!isScanningUserDir) { + if (stat.isDirectory()) { + String userName = stat.getPath().getName(); + LOG.info("Scanning service definitions for user {} ", userName); + scanForUserServiceDefinition(stat.getPath(), true); + } else { + LOG.info( + "Service definition {} doesn't belong to any user. Ignoring.. ", + stat.getPath().getName()); + } + } else { + String userName=dir.getName(); + Service service = getServiceDefinition(stat.getPath()); + if (service != null) { + Set services = userServices.get(userName); + if (services == null) { + services = new HashSet<>(); + userServices.put(userName, services); + } + LOG.info("Adding service {} for the user {}", service.getName(), + userName); + if (!services.add(service)) { + int count = ignoredUserServices.containsKey(userName) ? + ignoredUserServices.get(userName) : 0; + ignoredUserServices.put(userName, count + 1); + LOG.warn( + "Ignoring service {} for the user {} as it is already present.", + service.getName(), dir.getName()); + } + } + } + } + } + + private Service getServiceDefinition(Path filePath) { + Service service = null; + try { + LOG.info("Loading service definition from FS: " + filePath); + service = jsonSerDeser.load(fs, filePath); + } catch (IOException e) { + LOG.info("Error while loading service definition from FS: {}", e); + } + return service; + } + + + private RemoteIterator list(Path path) throws IOException { + return new StoppableRemoteIterator(fs.listStatusIterator(path)); + } + + @Override + public void setWebAppAddress(String webAppAddress) { + this.address = webAppAddress; + } + + @VisibleForTesting + Map getIgnoredUserServices() { + return ignoredUserServices; + } + + private class StoppableRemoteIterator implements RemoteIterator { + private final RemoteIterator remote; + + StoppableRemoteIterator(RemoteIterator remote) { + this.remote = remote; + } + + @Override + public boolean hasNext() throws IOException { + return !stopExecutors.get() && remote.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return remote.next(); + } + } + + @VisibleForTesting + Map> getUserServices() { + return userServices; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/ApiServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/ApiServiceUtils.java new file mode 100644 index 00000000000..d4a77fd6904 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/ApiServiceUtils.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.utils; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.service.api.records.ServiceStatus; +import org.apache.hadoop.yarn.util.RMHAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_EXCEPTION_THROWN; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; + +/** + * Api Service Utils class + */ +public final class ApiServiceUtils { + private static final Logger LOG = + LoggerFactory.getLogger(ApiServiceUtils.class); + + /** + * Calculate Resource Manager address base on working REST API. + */ + public static String getRMWebAddress(Configuration conf) { + String scheme = "http://"; + String path = "/app/v1/services/version"; + String rmAddress = conf.get("yarn.resourcemanager.webapp.address"); + if (YarnConfiguration.useHttps(conf)) { + scheme = "https://"; + rmAddress = conf.get("yarn.resourcemanager.webapp.https.address"); + } + boolean useKerberos = UserGroupInformation.isSecurityEnabled(); + List rmServers = + RMHAUtils.getRMHAWebappAddresses(new YarnConfiguration(conf)); + for (String host : rmServers) { + try { + Client client = Client.create(); + StringBuilder sb = new StringBuilder(); + sb.append(scheme); + sb.append(host); + sb.append(path); + if (!useKerberos) { + try { + String username = + UserGroupInformation.getCurrentUser().getShortUserName(); + sb.append("?user.name="); + sb.append(username); + } catch (IOException e) { + LOG.debug("Fail to resolve username: {}", e); + } + } + WebResource webResource = client.resource(sb.toString()); + if (useKerberos) { + AuthenticatedURL.Token token = new AuthenticatedURL.Token(); + webResource.header("WWW-Authenticate", token); + } + ClientResponse test = webResource.get(ClientResponse.class); + if (test.getStatus() == 200) { + rmAddress = host; + break; + } + } catch (Exception e) { + LOG.debug("Fail to connect to: " + host, e); + } + } + return scheme + rmAddress; + } + + public static int processResponse(ClientResponse response) { + response.bufferEntity(); + String output; + if (response.getStatus() == 401) { + LOG.error("Authentication required"); + return EXIT_EXCEPTION_THROWN; + } + if (response.getStatus() == 503) { + LOG.error("YARN Service is unavailable or disabled."); + return EXIT_EXCEPTION_THROWN; + } + try { + ServiceStatus ss = response.getEntity(ServiceStatus.class); + output = ss.getDiagnostics(); + } catch (Throwable t) { + output = response.getEntity(String.class); + } + if (output == null) { + output = response.getEntity(String.class); + } + if (response.getStatus() <= 299) { + LOG.info(output); + return EXIT_SUCCESS; + } else { + LOG.error(output); + return EXIT_EXCEPTION_THROWN; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/package-info.java new file mode 100644 index 00000000000..8162f9da3ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/utils/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.service.utils contains utils classes to + * be used for YARN Services API. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable +package org.apache.hadoop.yarn.service.utils; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServerWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServerWebApp.java index f4acd942cc9..ca33c8b19fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServerWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServerWebApp.java @@ -25,6 +25,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.service.client.SystemServiceManager; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.eclipse.jetty.webapp.Configuration; @@ -80,6 +81,14 @@ protected void serviceStart() throws Exception { doSecureLogin(getConfig()); } startWebApp(); + + // start only after webapp started. + SystemServiceManager serviceManager = + new SystemServiceManager(bindAddress.getHostName(), + bindAddress.getPort()); + serviceManager.init(getConfig()); + serviceManager.start(); + super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java new file mode 100644 index 00000000000..5009025f9bb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.client; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceStatus; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; +import static org.apache.hadoop.yarn.service.conf.RestApiConstants.CONTEXT_ROOT; +import static org.apache.hadoop.yarn.service.conf.RestApiConstants.SERVICE_ROOT_PATH; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test class for system service manager. + */ +public class TestSystemServiceImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSystemServiceImpl.class); + private SystemServiceManager systemService; + private Configuration conf; + private String resourcePath = "users"; + private String address = "localhost:9191"; + + private String[] users = new String[] {"user1","user2"}; + private static Map> loadedServices = new HashMap<>(); + private static Map> submittedServices = new HashMap<>(); + + @Before + public void setup() { + File file = new File( + getClass().getClassLoader().getResource(resourcePath).getFile()); + conf = new Configuration(); + conf.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY, + file.getAbsolutePath()); + systemService = new SystemServiceManager() { + @Override ServiceWebClient createServiceWebClient() { + ServiceWebClient client = new TestServiceWebClient(address); + client.init(conf); + client.start(); + return client; + } + }; + systemService.setWebAppAddress(address); + systemService.init(conf); // do not call explicit start + + constructUserService(users[0], "example-app1"); + constructUserService(users[1], "example-app1", "example-app2"); + } + + @After + public void teadDown() { + systemService.stop(); + } + + @Test + public void testSystemServiceSubmission() throws Exception { + systemService.scanSystemDirectoryForUsers(); + Map> userServices = systemService.getUserServices(); + Assert.assertEquals(loadedServices.size(), userServices.size()); + verifyUserServices(userServices); + /* verify for ignored sevices count */ + Map ignoredUserServices = + systemService.getIgnoredUserServices(); + Assert.assertEquals(1, ignoredUserServices.size()); + Assert.assertTrue("User user1 doesn't exist.", + ignoredUserServices.containsKey(users[0])); + int count = ignoredUserServices.get(users[0]); + Assert.assertEquals(1, count); + + systemService.launchUserService(); + verifyUserServices(); + + // 2nd time launch service to handle if service exist scenario + systemService.launchUserService(); + verifyUserServices(); + } + + private void verifyUserServices(Map> userServices) { + for (String user : users) { + Set services = userServices.get(user); + Set serviceNames = loadedServices.get(user); + Assert.assertEquals(serviceNames.size(), services.size()); + Iterator iterator = services.iterator(); + while (iterator.hasNext()) { + Service next = iterator.next(); + Assert.assertTrue("Service name doesn't exist in expected userService " + + serviceNames, serviceNames.contains(next.getName())); + } + } + } + + public void constructUserService(String user, String... serviceNames) { + Set service = loadedServices.get(user); + if (service == null) { + service = new HashSet<>(); + for (String serviceName : serviceNames) { + service.add(serviceName); + } + loadedServices.put(user, service); + } + } + + class TestServiceWebClient extends ServiceWebClient { + + TestServiceWebClient(String address) { + super(address); + } + + @Override + protected ClientResponse doPutObjects(URI base, String path, + MultivaluedMap params, Object obj) { + Service service = (Service) obj; + ServiceStatus serviceStatus = new ServiceStatus(); + + Set services = submittedServices.get(userName); + if (services == null) { + services = new HashSet<>(); + submittedServices.put(userName, services); + } + + if (services.contains(service.getName())) { + serviceStatus.setDiagnostics( + "Failed to create service " + service.getName() + + ", because it already exists."); + return createClientResponse(500, serviceStatus, ""); + } + services.add(service.getName()); + serviceStatus.setState(ACCEPTED); + serviceStatus + .setUri(CONTEXT_ROOT + SERVICE_ROOT_PATH + "/" + service.getName()); + return createClientResponse(202, serviceStatus, ""); + } + + ClientResponse createClientResponse(int status, ServiceStatus serviceStatus, + String message) { + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(status); + when(response.getEntity(ServiceStatus.class)).thenReturn(serviceStatus); + when(response.getEntity(String.class)).thenReturn(message); + return response; + } + } + + private void verifyUserServices() { + Assert.assertEquals(loadedServices.size(), submittedServices.size()); + for (Map.Entry> entry : submittedServices.entrySet()) { + String user = entry.getKey(); + Set serviceSet = entry.getValue(); + Assert.assertTrue(loadedServices.containsKey(user)); + Set services = loadedServices.get(user); + Assert.assertEquals(services.size(), serviceSet.size()); + Assert.assertTrue(services.containsAll(serviceSet)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app1.json @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user1/example-app2.json @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json new file mode 100644 index 00000000000..823561d8598 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app1.json @@ -0,0 +1,16 @@ +{ + "name": "example-app1", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json new file mode 100644 index 00000000000..d8fd1d12fca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/user2/example-app2.json @@ -0,0 +1,16 @@ +{ + "name": "example-app2", + "version": "1.0.0", + "components" : + [ + { + "name": "simple", + "number_of_containers": 1, + "launch_command": "sleep 2", + "resource": { + "cpus": 1, + "memory": "128" + } + } + ] +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java index b9a759438e0..edf18a1dba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -50,6 +50,11 @@ public static final String ROLLING_LOG_INCLUSION_PATTERN = "yarn.service.rolling-log.include-pattern"; public static final String ROLLING_LOG_EXCLUSION_PATTERN = "yarn.service.rolling-log.exclude-pattern"; + public static final String YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY = + YARN_SERVICE_PREFIX + "system-service.directory"; + + public static final String DEFAULT_YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY = + "/tmp/yarn-system-service"; /** * The yarn service base path: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java new file mode 100644 index 00000000000..30175fa39ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/ServiceManager.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.service; + +/** + * Interface for starting system services. + */ +public interface ServiceManager { + + void setWebAppAddress(String address); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java new file mode 100644 index 00000000000..c448bab134d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.service contains service related + * classes. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable + +package org.apache.hadoop.yarn.server.service; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 5140c9fa558..04002c1ae17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.service.ServiceManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -319,12 +320,12 @@ protected void serviceInit(Configuration conf) throws Exception { rmContext.setYarnConfiguration(conf); - createAndInitActiveServices(false); - webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); + createAndInitActiveServices(false); + RMApplicationHistoryWriter rmApplicationHistoryWriter = createRMApplicationHistoryWriter(); addService(rmApplicationHistoryWriter); @@ -479,6 +480,27 @@ protected ReservationSystem createReservationSystem() { } } + protected ServiceManager createServiceManager() { + String schedulerClassName = + conf.get(YarnConfiguration.YARN_API_SYSTEM_SERVICES_CLASS, + YarnConfiguration.DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS); + LOG.info("Using ServiceManager: " + schedulerClassName); + try { + Class schedulerClazz = Class.forName(schedulerClassName); + if (ServiceManager.class.isAssignableFrom(schedulerClazz)) { + return (ServiceManager) ReflectionUtils + .newInstance(schedulerClazz, this.conf); + } else { + throw new YarnRuntimeException( + "Class: " + schedulerClassName + " not instance of " + + ServiceManager.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ServiceManager: " + schedulerClassName, e); + } + } + protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(this.rmContext); } @@ -782,6 +804,13 @@ protected void serviceInit(Configuration configuration) throws Exception { new RMNMInfo(rmContext, scheduler); + if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, + false)) { + ServiceManager serviceManager = createServiceManager(); + serviceManager.setWebAppAddress(webAppAddress); + addIfService(serviceManager); + } + super.serviceInit(conf); } -- 2.13.6 (Apple Git-96)