From 9a8b7b83c800c8ff68c41d578eb3c121979c26fd Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Tue, 16 May 2017 12:31:58 +0530 Subject: [PATCH] System Services launched with Native Services API --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 4 + .../yarn/services/webapp/ApplicationApiWebApp.java | 8 +- .../hadoop/yarn/services/webapp/SystemService.java | 158 +++++++++++++++++++++ .../SystemServiceConfigurationException.java | 25 ++++ .../yarn/services/webapp/TestSystemService.java | 87 ++++++++++++ 5 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fa4d2e3..4a7bdc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2818,6 +2818,10 @@ public static boolean areNodeLabelsEnabled( public static final String TIMELINE_XFS_OPTIONS = TIMELINE_XFS_PREFIX + "xframe-options"; + public static final String YARN_NATIVE_SERVICES_SYSTEM_SERVICE_DIRECTORY = "yarn.native-services.system-service.directory"; + + public static final java.lang.String YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE = "yarn.native-services.system-service.queue"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java index 7fc01a1..2a15668 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/ApplicationApiWebApp.java @@ -27,6 +27,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.services.api.impl.ApplicationApiService; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -45,16 +46,21 @@ private static final String SEP = ";"; // REST API server for YARN native services - private HttpServer2 applicationApiServer; + HttpServer2 applicationApiServer; public static void main(String[] args) throws IOException { ApplicationApiWebApp apiWebApp = new ApplicationApiWebApp(); try { apiWebApp.startWebApp(); + new SystemService( + apiWebApp.applicationApiServer.getConnectorAddress(0).getHostName(), + apiWebApp.applicationApiServer.getConnectorAddress(0).getPort()) + .serviceInit(new YarnConfiguration()); } catch (Exception e) { if (apiWebApp != null) { apiWebApp.close(); } + e.printStackTrace(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java new file mode 100644 index 0000000..5212354 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemService.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.webapp; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.slider.api.resource.Application; +import org.apache.slider.core.persist.JsonSerDeser; +import org.codehaus.jackson.map.PropertyNamingStrategy; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import javax.ws.rs.core.MediaType; + +class SystemService extends AbstractService { + + protected FileSystem fs; + private static final Log LOG = LogFactory.getLog(SystemService.class); + private String apiServiceHostName; + private int apiServicePort; + private Path systemServiceDir; + private WebResource webResource; + + public SystemService(String apiServiceHostName, int apiServicePort) { + + super(SystemService.class.getName()); + this.apiServiceHostName = apiServiceHostName; + this.apiServicePort = apiServicePort; + } + + @Override protected void serviceInit(Configuration conf) { + systemServiceDir = new Path(conf.get( + YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_DIRECTORY)); + try { + fs = systemServiceDir.getFileSystem(conf); + } catch (IllegalArgumentException e) { + LOG.error("Error in System Service Directory path", e); + return; + } catch (IOException e) { + LOG.error("Unable to read System Service Directory", e); + return; + } + + try { + URI uri = getApiWebAppURI(apiServiceHostName, apiServicePort); + webResource = new Client().resource(uri); + } catch (URISyntaxException e) { + return; + } + + LOG.info("Starting System Services"); + try { + getServicesList(conf); + } catch (SystemServiceConfigurationException e) { + LOG.error("The System services could not be started", e); + } + } + + private void getServicesList(Configuration conf) + throws SystemServiceConfigurationException { + + RemoteIterator fileIter; + + try { + fileIter = fs.listFiles(systemServiceDir, false); + } catch (FileNotFoundException e) { + LOG.error("System Service Directory Not Found", e); + throw new SystemServiceConfigurationException(e); + } catch (IOException e) { + LOG.error("Unable to read System Service Directory", e); + throw new SystemServiceConfigurationException(e); + } + + try { + while (fileIter.hasNext()) { + LocatedFileStatus file = fileIter.next(); + if (file.isFile() && file.getPath().getName().endsWith(".json")) { + LOG.info("Submitting " + file.getPath().toUri().getPath() + + " for launching system service"); + submitJSON(file, webResource, conf); + } + } + } catch (IOException e) { + LOG.error("The system service directory could not be read", e); + throw new SystemServiceConfigurationException(e); + } + } + + private URI getApiWebAppURI(String apiServiceHostName, int apiServicePort) + throws URISyntaxException { + URI uri; + try { + uri = new URI("http", null, apiServiceHostName, apiServicePort, + "/services/v1/applications", null, null); + } catch (URISyntaxException e) { + LOG.error("The URI for System Services Web App has invalid syntax", e); + throw e; + } + LOG.info(uri.toString() + " used for submitting system services"); + return uri; + } + + protected void submitJSON(LocatedFileStatus file, WebResource webResource, + Configuration conf) { + JsonSerDeser serDeser = new JsonSerDeser(Application.class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + Application app = null; + try { + app = (Application) serDeser.load(fs, file.getPath()); + app.setQueue(conf.get( + YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE)); + } catch (IOException e) { + LOG.warn("JSON file could not be read", e); + return; + } + + try { + ClientResponse response = webResource.accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, serDeser.toJson(app)); + LOG.info(file.getPath().toUri().getPath() + + " submitted to the Services Wep App"); + LOG.info(response.toString()); + LOG.info(response.getEntity(String.class)); + } catch (IOException e) { + LOG.warn("Application could not be converted to json", e); + } catch (RuntimeException e) { + LOG.warn("Json submission lead to RunTime Exception", e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java new file mode 100644 index 0000000..3695f0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/services/webapp/SystemServiceConfigurationException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.webapp; + +class SystemServiceConfigurationException extends Exception { + + public SystemServiceConfigurationException(Exception cause) { + super(cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java new file mode 100644 index 0000000..f2a1bf4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/services/webapp/TestSystemService.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.services.webapp; + +import com.sun.jersey.api.client.WebResource; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.slider.api.resource.Application; +import org.apache.slider.core.persist.JsonSerDeser; +import org.codehaus.jackson.map.PropertyNamingStrategy; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestSystemService { + + private static SystemService systemService; + + private static final Log LOG = LogFactory.getLog(TestSystemService.class); + + @BeforeClass public static void setup() { + systemService = new SystemService("10.200.5.61", 9191) { + @Override + protected void submitJSON(LocatedFileStatus file, WebResource webResource, + Configuration conf) { + Application appBeforeMarshalling, appAfterUnmarshalling; + String appInJson; + JsonSerDeser serDeser = new JsonSerDeser(Application.class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + try { + appBeforeMarshalling = + (Application) serDeser.load(fs, file.getPath()); + conf.set(YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE, + "system"); + appBeforeMarshalling.setQueue(conf.get( + YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_QUEUE)); + appInJson = serDeser.toJson(appBeforeMarshalling); + } catch (IOException e) { + LOG.warn("JSON file could not be read", e); + return; + } + try { + appAfterUnmarshalling = (Application) serDeser.fromJson(appInJson); + assertEquals(appBeforeMarshalling.getQueue(), + appAfterUnmarshalling.getQueue()); + } catch (IOException e) { + LOG.warn("Application could not be converted into the json format", + e); + } + } + }; + } + + @Test public void testServiceSubmission() { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.YARN_NATIVE_SERVICES_SYSTEM_SERVICE_DIRECTORY, + "file:///Users/ljain/work/hadoop/testbasics/src/main/resources"); + systemService.serviceInit(conf); + } + + @AfterClass public static void tearDown() { + } + +} \ No newline at end of file -- 2.10.1 (Apple Git-78)