From 2225f3ed7889dbabe979f3a6f53b84678d8054dc Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Wed, 7 Jun 2017 18:24:34 +0530 Subject: [PATCH] YARN 6601: System Service with addService function + aplication cache --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 7 + .../yarn/services/webapp/ApplicationApiWebApp.java | 6 + .../hadoop/yarn/services/webapp/SystemService.java | 288 +++++++++++++++++++++ .../SystemServiceConfigurationException.java | 25 ++ .../hadoop/yarn/services/webapp/package-info.java | 18 ++ .../yarn/services/webapp/TestSystemService.java | 83 ++++++ .../src/main/resources/yarn-default.xml | 19 ++ 7 files changed, 446 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5e4c826..b5ff89e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2826,6 +2826,13 @@ public static boolean areNodeLabelsEnabled( public static final String TIMELINE_XFS_OPTIONS = TIMELINE_XFS_PREFIX + "xframe-options"; + public static final String YARN_NATIVE_SERVICES_SYSTEM_SERVICE_DIRECTORY = + "yarn.native-services.system-service.directory"; + + public static final java.lang.String + YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE = + "yarn.native-services.system-service.queue"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java index 7fc01a1..3a990de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java @@ -27,6 +27,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.services.api.impl.ApplicationApiService; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -51,10 +52,15 @@ public static void main(String[] args) throws IOException { ApplicationApiWebApp apiWebApp = new ApplicationApiWebApp(); try { apiWebApp.startWebApp(); + new SystemService( + apiWebApp.applicationApiServer.getConnectorAddress(0).getHostName(), + apiWebApp.applicationApiServer.getConnectorAddress(0).getPort()) + .serviceInit(new YarnConfiguration()); } catch (Exception e) { if (apiWebApp != null) { apiWebApp.close(); } + e.printStackTrace(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java new file mode 100644 index 0000000..d319ee6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.webapp; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.ApplicationState; +import org.apache.slider.core.persist.JsonSerDeser; +import org.codehaus.jackson.map.PropertyNamingStrategy; +import org.codehaus.jettison.json.JSONObject; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; + +class SystemService extends AbstractService { + + private FileSystem fs; + private static final Log LOG = LogFactory.getLog(SystemService.class); + private String apiServiceHostName; + private int apiServicePort; + private Path systemServiceDir; + private WebResource webResource; + private HashMap appCache; + private JsonSerDeser serDeser; + + class AppInfo { + private Application app; + private ClientResponse response; + private LocatedFileStatus jsonFile; + + private AppInfo(Application app, LocatedFileStatus jsonFile) { + this.app = app; + this.jsonFile = jsonFile; + } + + private void setResponse(ClientResponse response) { + this.response = response; + } + + Application getApp() { + return app; + } + } + + SystemService(String apiServiceHostName, int apiServicePort) { + + super(SystemService.class.getName()); + this.apiServiceHostName = apiServiceHostName; + this.apiServicePort = apiServicePort; + this.appCache = new HashMap<>(); + } + + @Override protected void serviceInit(Configuration conf) { + systemServiceDir = new Path(conf.get( + YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_DIRECTORY, + getDefaultSystemServiceDirectory())); + serDeser = new JsonSerDeser(Application.class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + try { + fs = systemServiceDir.getFileSystem(conf); + } catch (IllegalArgumentException e) { + LOG.error("Error in System Service Directory path", e); + return; + } catch (IOException e) { + LOG.error("Unable to read System Service Directory", e); + return; + } + + URI uri = getApiWebAppURI(); + if (uri == null) { + return; + } + webResource = new Client().resource(uri); + + LOG.info("Starting System Services"); + try { + getServicesList(conf); + } catch (SystemServiceConfigurationException e) { + LOG.error("The System services could not be started", e); + } + } + + private String getDefaultSystemServiceDirectory() { + return "file:///tmp/hadoop/system-service"; + } + + JsonSerDeser getSerDeser() { + return serDeser; + } + + private void addService(LocatedFileStatus file, Configuration conf) { + if (file.isFile() && file.getPath().getName().endsWith(".json")) { + LOG.info("Trying to Submit " + file.getPath().toUri().getPath() + + " for launching system service"); + Application app = getApplicationObject(file, conf); + if (app != null) { + if (appCache.get(app.getName()) == null) { + appCache.put(app.getName(), new AppInfo(app, file)); + } else if (appCache.get(app.getName()).jsonFile.compareTo(file) == 0) { + /*//Case when app is re-submitted in case of failure.It may have + //same file and app name.*/ + appCache.put(app.getName(), new AppInfo(app, file)); + } else { + LOG.warn("AppName in " + file.getPath().toUri().getPath() + + " has already been used by another JSON file" + appCache + .get(app.getName()).jsonFile.getPath().toUri().getPath()); + return; + } + submitServiceApp(appCache.get(app.getName())); + } + } + } + + private void getServicesList(Configuration conf) + throws SystemServiceConfigurationException { + + RemoteIterator fileIter; + + try { + fileIter = fs.listFiles(systemServiceDir, false); + } catch (FileNotFoundException e) { + LOG.error("System Service Directory Not Found", e); + throw new SystemServiceConfigurationException(e); + } catch (IOException e) { + LOG.error("Unable to read System Service Directory", e); + throw new SystemServiceConfigurationException(e); + } + + try { + while (fileIter.hasNext()) { + LocatedFileStatus file = fileIter.next(); + if (file.isFile() && file.getPath().getName().endsWith(".json")) { + LOG.info("Trying to Submit " + file.getPath().toUri().getPath() + + " for launching system service"); + Application app = getApplicationObject(file, conf); + if (app != null) { + if (appCache.get(app.getName()) == null) { + appCache.put(app.getName(), new AppInfo(app, file)); + } else { + /*if(appCache.get(app.getName()).jsonFile.compareTo(file)==0){ + appCache.put(app.getName(), new AppInfo(app, file)); + } + //Case when app is re-submitted in case of failure.It may have + //same file and app name.*/ + LOG.warn("AppName in " + file.getPath().toUri().getPath() + + " has already been used by another JSON file" + appCache + .get(app.getName()).jsonFile.getPath().toUri().getPath()); + } + } + } + } + submitServiceApps(); + } catch (IOException e) { + LOG.error("The system service directory could not be read", e); + throw new SystemServiceConfigurationException(e); + } + } + + private URI getApiWebAppURI() { + URI uri; + try { + uri = new URI("http", null, apiServiceHostName, apiServicePort, + "/services/v1/applications", null, null); + } catch (URISyntaxException e) { + LOG.error("The URI for System Services Web App has invalid syntax", e); + return null; + } + LOG.info(uri.toString() + " used for submitting system services"); + return uri; + } + + private Application getApplicationObject(LocatedFileStatus file, + Configuration conf) { + Application app; + try { + app = (Application) serDeser.load(fs, file.getPath()); + app.setQueue(conf.get( + YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE)); + } catch (IOException e) { + LOG.warn("JSON file " + file.getPath().toUri().getPath() + + " could not be read", e); + return null; + } + return app; + } + + private void submitServiceApps() { + for (AppInfo appInfo : appCache.values()) { + submitServiceApp(appInfo); + } + } + + protected void submitServiceApp(AppInfo appInfo) { + Application app = appInfo.app; + LocatedFileStatus jsonFile = appInfo.jsonFile; + validateAppForResubmission(appInfo); + try { + ClientResponse response = webResource.accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, serDeser.toJson(app)); + appInfo.setResponse(response); + LOG.info(jsonFile.getPath().toUri().getPath() + + " submitted to the Services Wep App"); + LOG.info(response.toString()); + LOG.info(response.getEntity(String.class)); + } catch (IOException e) { + LOG.warn("Application could not be converted to json", e); + } catch (RuntimeException e) { + LOG.warn("Json submission lead to RunTime Exception", e); + } + } + + private void prepareAppForResubmission(String appName) { + webResource.path(appName).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).delete(); + } + + void validateAppForResubmission(AppInfo value) { + if (value.response != null && value.response.getStatus() == Status.CREATED + .getStatusCode()) { + Application app = value.app; + LOG.info(app.getName() + + " has already been created. Checking for status and validating the app."); + Application responseApp; + ApplicationState responseAppState; + ClientResponse response; + try { + response = + webResource.path(app.getName()).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + if (response.getStatus() != Status.OK.getStatusCode()) { + LOG.warn("Failed to check status for " + app.getName() + + " . GET request resulted in response status code " + response + .getStatus()); + return; + } + responseApp = (Application) serDeser + .fromJson(response.getEntity(JSONObject.class).toString()); + } catch (IOException e) { + LOG.info( + "Failed to validate the app for resubmission. Error in reading application from json received in GET Request for app: " + + app.getName(), e); + return; + } catch (RuntimeException e) { + LOG.info( + "Failed to validate the app for resubmission. Runtime Exception in sending GET Request for app: " + + app.getName(), e); + return; + } + responseAppState = responseApp.getState(); + if (!(responseAppState != ApplicationState.STOPPED + && responseAppState != ApplicationState.FAILED)) { + prepareAppForResubmission(app.getName()); + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java new file mode 100644 index 0000000..85fcb95 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.webapp; + +class SystemServiceConfigurationException extends Exception { + + SystemServiceConfigurationException(Exception cause) { + super(cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/package-info.java new file mode 100644 index 0000000..eb27d4b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.services.webapp; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java new file mode 100644 index 0000000..b20206c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.webapp; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.slider.api.resource.Application; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestSystemService { + + private static SystemService systemService; + + private static final Log LOG = LogFactory.getLog(TestSystemService.class); + + @BeforeClass + public static void setup() { + systemService = new SystemService("10.200.5.61", 9191) { + @Override + protected void submitServiceApp(AppInfo appInfo) { + Application appBeforeMarshalling = appInfo.getApp(); + Application appAfterUnmarshalling; + String appInJson; + Configuration conf = new YarnConfiguration(); + validateAppForResubmission(appInfo); + try { + conf.set(YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE, + "system"); + appBeforeMarshalling.setQueue(conf.get( + YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE)); + appInJson = getSerDeser().toJson(appBeforeMarshalling); + } catch (IOException e) { + LOG.warn("JSON file could not be read", e); + return; + } + try { + appAfterUnmarshalling = (Application) getSerDeser().fromJson(appInJson); + assertEquals(appBeforeMarshalling.getQueue(), + appAfterUnmarshalling.getQueue()); + } catch (IOException e) { + LOG.warn("Application could not be converted into the json format", + e); + } + } + }; + } + + @Test + public void testServiceSubmission() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_DIRECTORY, + "file:///Users/ljain/work/hadoop/testbasics/src/main/resources"); + systemService.serviceInit(conf); + } + + @AfterClass + public static void tearDown() { + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e687eef..6985377 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3108,4 +3108,23 @@ false + + + The directory which stores the json files which are submitted to the + Service Api for launching system services. The default value used is + /tmp/hadoop/system-service + + yarn.native-services.system-service.directory + + + + + + The queue in which the system services woud be submitted. If not specified + then default queue is used. + + yarn.native-services.system-service.queue + + + -- 2.10.1 (Apple Git-78)