commit fdbc663eaceed2823e1f03f709733ce1de98b971 Author: Eric Yang Date: Fri Dec 8 20:11:25 2017 -0500 YARN-7605. Added doAs validation for submitting application. (Contributed by Eric Yang) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java index fa447d8..094c85c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java @@ -865,6 +865,46 @@ public void addInternalServlet(String name, String pathSpec, } /** + * Add an internal servlet in the server, specifying whether or not to + * protect with Kerberos authentication. + * Note: This method is to be used for adding servlets that facilitate + * internal communication and not for user facing functionality. For + * servlets added using this method, filters (except internal Kerberos + * filters) are not enabled. + * + * @param name The name of the servlet (can be passed as null) + * @param pathSpec The path spec for the servlet + * @param clazz The servlet class + * @param requireAuth Require Kerberos authenticate to access servlet + */ + public void addInternalServlet(String name, String pathSpec, + Class clazz, Map params) { + // Jetty doesn't like the same path spec mapping to different servlets, so + // if there's already a mapping for this pathSpec, remove it and assume that + // the newest one is the one we want + final ServletHolder sh = new ServletHolder(ServletContainer.class); + sh.setName(name); + sh.setInitParameters(params); + final ServletMapping[] servletMappings = + webAppContext.getServletHandler().getServletMappings(); + for (int i = 0; i < servletMappings.length; i++) { + if (servletMappings[i].containsPathSpec(pathSpec)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found existing " + servletMappings[i].getServletName() + + " servlet at path " + pathSpec + "; will replace mapping" + + " with " + sh.getName() + " servlet"); + } + ServletMapping[] newServletMappings = + ArrayUtil.removeFromArray(servletMappings, servletMappings[i]); + webAppContext.getServletHandler() + .setServletMappings(newServletMappings); + break; + } + } + webAppContext.addServlet(sh, pathSpec); + } + + /** * Add the given handler to the front of the list of handlers. * * @param handler The handler to add diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 34ab8f0..f0b628d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -20,6 +20,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -30,10 +31,10 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; import org.apache.hadoop.yarn.service.client.ServiceClient; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -42,10 +43,13 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.Map; @@ -71,7 +75,8 @@ public ApiServer(Configuration conf) { private static final Logger LOG = LoggerFactory.getLogger(ApiServer.class); private static Configuration YARN_CONFIG = new YarnConfiguration(); - private static ServiceClient SERVICE_CLIENT; + private ServiceClient serviceClientUnitTest; + private static boolean unitTest = false; static { init(); @@ -79,9 +84,6 @@ public ApiServer(Configuration conf) { // initialize all the common resources - order is important private static void init() { - SERVICE_CLIENT = new ServiceClient(); - SERVICE_CLIENT.init(YARN_CONFIG); - SERVICE_CLIENT.start(); } @GET @@ -98,11 +100,25 @@ public Response getVersion() { @Path(SERVICE_ROOT_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response createService(Service service) { - LOG.info("POST: createService = {}", service); + public Response createService(@Context HttpServletRequest request, + Service service) { ServiceStatus serviceStatus = new ServiceStatus(); try { - ApplicationId applicationId = SERVICE_CLIENT.actionCreate(service); + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("POST: createService = {} user = {}", service, ugi); + ApplicationId applicationId = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationId run() throws Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + ApplicationId applicationId = sc.actionCreate(service); + sc.close(); + return applicationId; + } + }); LOG.info("Successfully created service " + service.getName() + " applicationId = " + applicationId); serviceStatus.setState(ACCEPTED); @@ -127,11 +143,25 @@ public Response createService(Service service) { @Path(SERVICE_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response getService(@PathParam(SERVICE_NAME) String appName) { - LOG.info("GET: getService for appName = {}", appName); + public Response getService(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName) { ServiceStatus serviceStatus = new ServiceStatus(); try { - Service app = SERVICE_CLIENT.getStatus(appName); + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("GET: getService for appName = {} user = {}", appName, ugi); + Service app = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Service run() throws Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Service app = sc.getStatus(appName); + sc.close(); + return app; + } + }); return Response.ok(app).build(); } catch (IllegalArgumentException e) { serviceStatus.setDiagnostics(e.getMessage()); @@ -151,33 +181,64 @@ public Response getService(@PathParam(SERVICE_NAME) String appName) { @Path(SERVICE_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response deleteService(@PathParam(SERVICE_NAME) String appName) { - LOG.info("DELETE: deleteService for appName = {}", appName); - return stopService(appName, true); + public Response deleteService(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName) { + try { + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("DELETE: deleteService for appName = {} user = {}", appName, ugi); + return stopService(appName, true, ugi); + } catch (IOException e) { + LOG.info("Fail to authenticate: ", e); + ServiceStatus entity = new ServiceStatus(); + entity.setDiagnostics("Failed to authenticate API service with KDC."); + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(entity).build(); + } } - private Response stopService(String appName, boolean destroy) { + private Response stopService(String appName, boolean destroy, final + UserGroupInformation ugi) { try { - SERVICE_CLIENT.actionStop(appName, destroy); - if (destroy) { - SERVICE_CLIENT.actionDestroy(appName); - LOG.info("Successfully deleted service {}", appName); + Integer result = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Integer run() throws IOException, YarnException, + ApplicationNotFoundException { + int result = 0; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result = sc.actionStop(appName, destroy); + if (destroy) { + result = sc.actionDestroy(appName); + LOG.info("Successfully deleted service {}", appName); + } else { + LOG.info("Successfully stopped service {}", appName); + } + sc.close(); + return new Integer(result); + } + }); + if (result.intValue() == 0) { + return Response.status(Status.OK).build(); } else { - LOG.info("Successfully stopped service {}", appName); + throw new ApplicationNotFoundException("Service " + appName + + " is not found in YARN."); } - return Response.status(Status.OK).build(); - } catch (ApplicationNotFoundException e) { - ServiceStatus serviceStatus = new ServiceStatus(); - serviceStatus.setDiagnostics( - "Service " + appName + " is not found in YARN: " + e.getMessage()); - return Response.status(Status.BAD_REQUEST).entity(serviceStatus) - .build(); - } catch (Exception e) { + } catch (NullPointerException | IOException e) { LOG.error("Fail to stop service:", e); ServiceStatus serviceStatus = new ServiceStatus(); serviceStatus.setDiagnostics(e.getMessage()); return Response.status(Status.INTERNAL_SERVER_ERROR) .entity(serviceStatus).build(); + } catch (InterruptedException | ApplicationNotFoundException | + UndeclaredThrowableException e) { + ServiceStatus serviceStatus = new ServiceStatus(); + serviceStatus.setDiagnostics( + "Service " + appName + " is not found in YARN: " + e.getMessage()); + return Response.status(Status.BAD_REQUEST).entity(serviceStatus) + .build(); } } @@ -185,7 +246,8 @@ private Response stopService(String appName, boolean destroy) { @Path(COMPONENT_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN }) - public Response updateComponent(@PathParam(SERVICE_NAME) String appName, + public Response updateComponent(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName, @PathParam(COMPONENT_NAME) String componentName, Component component) { if (component.getNumberOfContainers() < 0) { @@ -196,9 +258,13 @@ public Response updateComponent(@PathParam(SERVICE_NAME) String appName, } ServiceStatus status = new ServiceStatus(); try { - Map original = SERVICE_CLIENT.flexByRestService(appName, + ServiceClient sc = new ServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Map original = sc.flexByRestService(appName, Collections.singletonMap(component.getName(), component.getNumberOfContainers())); + sc.close(); status.setDiagnostics( "Updating component (" + componentName + ") size from " + original .get(componentName) + " to " + component.getNumberOfContainers()); @@ -214,44 +280,58 @@ public Response updateComponent(@PathParam(SERVICE_NAME) String appName, @Path(SERVICE_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response updateService(@PathParam(SERVICE_NAME) String appName, + public Response updateService(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName, Service updateServiceData) { - LOG.info("PUT: updateService for app = {} with data = {}", appName, - updateServiceData); - - // Ignore the app name provided in updateServiceData and always use appName - // path param - updateServiceData.setName(appName); - - // For STOP the app should be running. If already stopped then this - // operation will be a no-op. For START it should be in stopped state. - // If already running then this operation will be a no-op. - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.STOPPED) { - return stopService(appName, false); - } - - // If a START is requested - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.STARTED) { - return startService(appName); - } - - // If new lifetime value specified then update it - if (updateServiceData.getLifetime() != null - && updateServiceData.getLifetime() > 0) { - return updateLifetime(appName, updateServiceData); + try { + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("PUT: updateService for app = {} with data = {} user = {}", appName, + updateServiceData, ugi); + // Ignore the app name provided in updateServiceData and always use appName + // path param + updateServiceData.setName(appName); + // For STOP the app should be running. If already stopped then this + // operation will be a no-op. For START it should be in stopped state. + // If already running then this operation will be a no-op. + if (updateServiceData.getState() != null + && updateServiceData.getState() == ServiceState.STOPPED) { + return stopService(appName, false, ugi); + } + // If a START is requested + if (updateServiceData.getState() != null + && updateServiceData.getState() == ServiceState.STARTED) { + return startService(appName, ugi); + } + // If new lifetime value specified then update it + if (updateServiceData.getLifetime() != null + && updateServiceData.getLifetime() > 0) { + return updateLifetime(appName, updateServiceData, ugi); + } + } catch(IOException e) { + return Response.status(Status.FORBIDDEN).build(); } - // If nothing happens consider it a no-op return Response.status(Status.NO_CONTENT).build(); } - private Response updateLifetime(String appName, Service updateAppData) { + private Response updateLifetime(String appName, Service updateAppData, + final UserGroupInformation ugi) { ServiceStatus status = new ServiceStatus(); try { - String newLifeTime = - SERVICE_CLIENT.updateLifetime(appName, updateAppData.getLifetime()); + String newLifeTime = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public String run() throws Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + String newLifeTime = + sc.updateLifetime(appName, updateAppData.getLifetime()); + sc.close(); + return newLifeTime; + } + }); status.setDiagnostics( "Service (" + appName + ")'s lifeTime is updated to " + newLifeTime + ", " + updateAppData.getLifetime() @@ -268,10 +348,20 @@ private Response updateLifetime(String appName, Service updateAppData) { } } - private Response startService(String appName) { + private Response startService(String appName, final UserGroupInformation ugi) { ServiceStatus status = new ServiceStatus(); try { - SERVICE_CLIENT.actionStart(appName); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws ApplicationNotFoundException, Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + sc.actionStart(appName); + sc.close(); + return null; + } + }); LOG.info("Successfully started service " + appName); status.setDiagnostics("Service " + appName + " is successfully started."); status.setState(ServiceState.ACCEPTED); @@ -290,10 +380,16 @@ private Response startService(String appName) { * * @param mockServerClient - A mocked version of ServiceClient */ - public static void setServiceClient(ServiceClient mockServerClient) { - SERVICE_CLIENT = mockServerClient; - SERVICE_CLIENT.init(YARN_CONFIG); - SERVICE_CLIENT.start(); + public void setServiceClient(ServiceClient mockServerClient) { + serviceClientUnitTest = mockServerClient; + unitTest = true; } + private ServiceClient getServiceClient() { + if (unitTest) { + return serviceClientUnitTest; + } else { + return new ServiceClient(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java index 896b2f6..9aa38a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java @@ -26,12 +26,15 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.webapp.ApiServer; + +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Path; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.List; @@ -44,15 +47,18 @@ */ public class TestApiServer { private ApiServer apiServer; + private HttpServletRequest request; @Before public void setup() throws Exception { + request = Mockito.mock(HttpServletRequest.class); + Mockito.when(request.getRemoteUser()).thenReturn(System.getProperty("user.name")); ServiceClient mockServerClient = new ServiceClientTest(); Configuration conf = new Configuration(); conf.set("yarn.api-service.service.client.class", ServiceClientTest.class.getName()); - ApiServer.setServiceClient(mockServerClient); - this.apiServer = new ApiServer(conf); + apiServer = new ApiServer(conf); + apiServer.setServiceClient(mockServerClient); } @Test @@ -77,7 +83,7 @@ public void testGetVersion() { public void testBadCreateService() { Service service = new Service(); // Test for invalid argument - final Response actual = apiServer.createService(service); + final Response actual = apiServer.createService(request, service); assertEquals("Create service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); } @@ -101,21 +107,21 @@ public void testGoodCreateService() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.createService(service); + final Response actual = apiServer.createService(request, service); assertEquals("Create service is ", actual.getStatus(), Response.status(Status.ACCEPTED).build().getStatus()); } @Test public void testBadGetService() { - final Response actual = apiServer.getService("no-jenkins"); + final Response actual = apiServer.getService(request, "no-jenkins"); assertEquals("Get service is ", actual.getStatus(), Response.status(Status.NOT_FOUND).build().getStatus()); } @Test public void testBadGetService2() { - final Response actual = apiServer.getService(null); + final Response actual = apiServer.getService(request, null); assertEquals("Get service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR) .build().getStatus()); @@ -123,21 +129,21 @@ public void testBadGetService2() { @Test public void testGoodGetService() { - final Response actual = apiServer.getService("jenkins"); + final Response actual = apiServer.getService(request, "jenkins"); assertEquals("Get service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); } @Test public void testBadDeleteService() { - final Response actual = apiServer.deleteService("no-jenkins"); + final Response actual = apiServer.deleteService(request, "no-jenkins"); assertEquals("Delete service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); } @Test public void testBadDeleteService2() { - final Response actual = apiServer.deleteService(null); + final Response actual = apiServer.deleteService(request, null); assertEquals("Delete service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR) .build().getStatus()); @@ -145,7 +151,7 @@ public void testBadDeleteService2() { @Test public void testGoodDeleteService() { - final Response actual = apiServer.deleteService("jenkins"); + final Response actual = apiServer.deleteService(request, "jenkins"); assertEquals("Delete service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); } @@ -170,7 +176,7 @@ public void testDecreaseContainerAndStop() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("update service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -197,7 +203,7 @@ public void testBadDecreaseContainerAndStop() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("flex service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); @@ -223,7 +229,7 @@ public void testIncreaseContainersAndStart() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("flex service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -249,7 +255,7 @@ public void testBadStartServices() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("start service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR).build() @@ -276,7 +282,7 @@ public void testGoodStartServices() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("start service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -303,7 +309,7 @@ public void testBadStopServices() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("stop service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); @@ -330,7 +336,7 @@ public void testGoodStopServices() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("stop service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -357,7 +363,7 @@ public void testUpdateService() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("update service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java index ffd9328..c5c2057 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java @@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -36,6 +37,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*; /** @@ -189,7 +191,8 @@ public void testSave() { long lifetime = 3600L; String queue = "default"; try { - int result = asc.actionSave(fileName, appName, lifetime, queue); + int result = asc.actionSave(fileName, appName, lifetime, + queue); assertEquals(EXIT_SUCCESS, result); } catch (IOException | YarnException e) { fail(); @@ -203,7 +206,8 @@ public void testBadSave() { long lifetime = 3600L; String queue = "default"; try { - int result = badAsc.actionSave(fileName, appName, lifetime, queue); + int result = badAsc.actionSave(fileName, appName, lifetime, + queue); assertEquals(EXIT_EXCEPTION_THROWN, result); } catch (IOException | YarnException e) { fail(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 81c56d2..4cd2eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -186,7 +186,8 @@ public Service loadAppJsonFromLocalFS(String fileName, String serviceName, } public int actionSave(String fileName, String serviceName, Long lifetime, - String queue) throws IOException, YarnException { + String queue) + throws IOException, YarnException { return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName, lifetime, queue)); } @@ -216,7 +217,12 @@ public ApplicationId actionCreate(Service service) // Write the definition first and then submit - AM will read the definition createDirAndPersistApp(appDir, service); - ApplicationId appId = submitApp(service); + ApplicationId appId; + try { + appId = submitApp(service); + } catch (InterruptedException e) { + throw new YarnException(e); + } cachedAppInfo.put(serviceName, new AppInfo(appId, service .getKerberosPrincipal().getPrincipalName())); service.setId(appId.toString()); @@ -530,7 +536,7 @@ private void verifyNoLiveAppInRM(String serviceName, String action) } private ApplicationId submitApp(Service app) - throws IOException, YarnException { + throws IOException, YarnException, InterruptedException { String serviceName = app.getName(); Configuration conf = getConfig(); Path appRootDir = fs.buildClusterDirPath(app.getName()); @@ -550,7 +556,7 @@ private ApplicationId submitApp(Service app) } submissionContext.setMaxAppAttempts(YarnServiceConf .getInt(YarnServiceConf.AM_RESTART_MAX, 20, app.getConfiguration(), - conf)); + conf)); setLogAggregationContext(app, conf, submissionContext); @@ -564,7 +570,7 @@ private ApplicationId submitApp(Service app) // add keytab if in secure env addKeytabResourceIfSecure(fs, localResources, app); if (LOG.isDebugEnabled()) { - printLocalResources(localResources); + printLocalResources(localResources); } Map env = addAMEnv(); @@ -572,11 +578,11 @@ private ApplicationId submitApp(Service app) String cmdStr = buildCommandLine(app, conf, appRootDir, hasAMLog4j); submissionContext.setResource(Resource.newInstance(YarnServiceConf .getLong(YarnServiceConf.AM_RESOURCE_MEM, - YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM, app.getConfiguration(), - conf), 1)); + YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM, app.getConfiguration(), + conf), 1)); String queue = app.getQueue(); if (StringUtils.isEmpty(queue)) { - queue = conf.get(YARN_QUEUE, "default"); + queue = conf.get(YARN_QUEUE, "default"); } submissionContext.setQueue(queue); submissionContext.setApplicationName(serviceName); @@ -584,7 +590,7 @@ private ApplicationId submitApp(Service app) Set appTags = AbstractClientProvider.createApplicationTags(serviceName, null, null); if (!appTags.isEmpty()) { - submissionContext.setApplicationTags(appTags); + submissionContext.setApplicationTags(appTags); } ContainerLaunchContext amLaunchContext = Records.newRecord(ContainerLaunchContext.class); @@ -593,7 +599,7 @@ private ApplicationId submitApp(Service app) amLaunchContext.setLocalResources(localResources); addHdfsDelegationTokenIfSecure(amLaunchContext); submissionContext.setAMContainerSpec(amLaunchContext); - yarnClient.submitApplication(submissionContext); + yarnClient.submitApplication(submissionContext); return submissionContext.getApplicationId(); } @@ -738,14 +744,20 @@ private boolean addAMLog4jResource(String serviceName, Configuration conf, return hasAMLog4j; } - public int actionStart(String serviceName) throws YarnException, IOException { + public int actionStart(String serviceName) + throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); Path appDir = checkAppExistOnHdfs(serviceName); Service service = ServiceApiUtil.loadService(fs, serviceName); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); // see if it is actually running and bail out; verifyNoLiveAppInRM(serviceName, "thaw"); - ApplicationId appId = submitApp(service); + ApplicationId appId; + try { + appId = submitApp(service); + } catch (InterruptedException e) { + throw new YarnException(e); + } service.setId(appId.toString()); // write app definition on to hdfs Path appJson = persistAppDef(appDir, service); @@ -770,7 +782,7 @@ private Path checkAppExistOnHdfs(String serviceName) private void createDirAndPersistApp(Path appDir, Service service) throws IOException, SliderException { - FsPermission appDirPermission = new FsPermission("750"); + FsPermission appDirPermission = new FsPermission("777"); fs.createWithPermissions(appDir, appDirPermission); Path appJson = persistAppDef(appDir, service); LOG.info("Persisted service " + service.getName() + " at " + appJson); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 1c517d9..7b77b2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -271,10 +271,6 @@ private void checkCompInstancesInOrder(ServiceClient client, } } - private void checkRegistryAndCompDirDeleted() { - - } - private void checkEachCompInstancesInOrder(Component component) { long expectedNumInstances = component.getNumberOfContainers(); Assert.assertEquals(expectedNumInstances, component.getContainers().size()); @@ -290,32 +286,6 @@ private void checkEachCompInstancesInOrder(Component component) { } } - private void waitForOneCompToBeReady(ServiceClient client, - Service exampleApp, String readyComp) - throws TimeoutException, InterruptedException { - long numExpectedContainers = - exampleApp.getComponent(readyComp).getNumberOfContainers(); - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - Component retrievedComp = retrievedApp.getComponent(readyComp); - - if (retrievedComp.getContainers() != null - && retrievedComp.getContainers().size() == numExpectedContainers) { - LOG.info(readyComp + " found " + numExpectedContainers - + " containers running"); - return true; - } else { - LOG.info(" Waiting for " + readyComp + "'s containers to be running"); - return false; - } - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, 200000); - } - /** * Wait until all the containers for all components become ready state. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index dd7515f..385f258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -58,6 +58,7 @@ protected AppAdminClient() { * @param conf configuration * @return app admin client */ + @SuppressWarnings("unchecked") @Public @Unstable public static AppAdminClient createAppAdminClient(String appType, @@ -106,7 +107,8 @@ public static AppAdminClient createAppAdminClient(String appType, @Public @Unstable public abstract int actionLaunch(String fileName, String appName, Long - lifetime, String queue) throws IOException, YarnException; + lifetime, String queue) + throws IOException, YarnException; /** *

@@ -122,8 +124,8 @@ public abstract int actionLaunch(String fileName, String appName, Long */ @Public @Unstable - public abstract int actionStop(String appName) throws IOException, - YarnException; + public abstract int actionStop(String appName) + throws IOException, YarnException; /** *

@@ -139,8 +141,8 @@ public abstract int actionStop(String appName) throws IOException, */ @Public @Unstable - public abstract int actionStart(String appName) throws IOException, - YarnException; + public abstract int actionStart(String appName) + throws IOException, YarnException; /** *

@@ -159,7 +161,8 @@ public abstract int actionStart(String appName) throws IOException, @Public @Unstable public abstract int actionSave(String fileName, String appName, Long - lifetime, String queue) throws IOException, YarnException; + lifetime, String queue) + throws IOException, YarnException; /** *

@@ -174,8 +177,8 @@ public abstract int actionSave(String fileName, String appName, Long */ @Public @Unstable - public abstract int actionDestroy(String appName) throws IOException, - YarnException; + public abstract int actionDestroy(String appName) + throws IOException, YarnException; /** *

@@ -223,6 +226,6 @@ public abstract int enableFastLaunch() throws IOException, */ @Public @Unstable - public abstract String getStatusString(String applicationId) throws - IOException, YarnException; + public abstract String getStatusString(String applicationId) + throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index fb08fcd..6d736e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -33,6 +33,7 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java index d3ad53e..5b15068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java @@ -42,7 +42,9 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.http.RestCsrfPreventionFilter; import org.apache.hadoop.security.http.XFrameOptionsFilter; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticationHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.eclipse.jetty.webapp.WebAppContext; import org.slf4j.Logger; @@ -82,6 +84,7 @@ public Class clazz; public String name; public String spec; + public Map params; } final String name; @@ -147,7 +150,19 @@ servlets.add(struct); return this; } - + + public Builder withServlet(String name, String pathSpec, + Class servlet, + Map params) { + ServletStruct struct = new ServletStruct(); + struct.clazz = servlet; + struct.name = name; + struct.spec = pathSpec; + struct.params = params; + servlets.add(struct); + return this; + } + public Builder with(Configuration conf) { this.conf = conf; return this; @@ -243,6 +258,11 @@ public void setup() { pathList.add("/" + wsName + "/*"); } } + for (ServletStruct s : servlets) { + if (!pathList.contains(s.spec)) { + pathList.add(s.spec); + } + } if (conf == null) { conf = new Configuration(); } @@ -315,7 +335,12 @@ public void setup() { HttpServer2 server = builder.build(); for(ServletStruct struct: servlets) { - server.addServlet(struct.name, struct.spec, struct.clazz); + if (struct.params != null) { + server.addInternalServlet(struct.name, struct.spec, + struct.clazz, struct.params); + } else { + server.addServlet(struct.name, struct.spec, struct.clazz); + } } for(Map.Entry entry : attributes.entrySet()) { server.setAttribute(entry.getKey(), entry.getValue()); @@ -405,11 +430,6 @@ public WebApp start(WebApp webapp, WebAppContext ui2Context, addFiltersForNewContext(ui2Context); httpServer.addHandlerAtFront(ui2Context); } - if (services!=null) { - String packageName = services.get("PackageName"); - String pathSpec = services.get("PathSpec"); - httpServer.addJerseyResourcePackage(packageName, pathSpec); - } try { httpServer.start(); LOG.info("Web app " + name + " started at " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a0317f6..b372b1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.spi.container.servlet.ServletContainer; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.AuthInfo; @@ -1041,11 +1043,22 @@ protected void startWepApp() { RMWebAppUtil.setupSecurityAndFilters(conf, getClientRMService().rmDTSecretManager); + Map params = new HashMap(); + if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, + false)) { + String apiPackages = "org.apache.hadoop.yarn.service.webapp;" + + "org.apache.hadoop.yarn.webapp"; + params.put("com.sun.jersey.config.property.resourceConfigClass", + "com.sun.jersey.api.core.PackagesResourceConfig"); + params.put("com.sun.jersey.config.property.packages", apiPackages); + } + Builder builder = WebApps .$for("cluster", ApplicationMasterService.class, masterService, "ws") .with(conf) + .withServlet("API-Service", "/app/*", ServletContainer.class, params) .withHttpSpnegoPrincipalKey( YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) .withHttpSpnegoKeytabKey( @@ -1101,15 +1114,7 @@ protected void startWepApp() { } } - if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, - false)) { - serviceConfig = new HashMap(); - String apiPackages = "org.apache.hadoop.yarn.service.webapp;" + - "org.apache.hadoop.yarn.webapp"; - serviceConfig.put("PackageName", apiPackages); - serviceConfig.put("PathSpec", "/app/*"); - } - webApp = builder.start(new RMWebApp(this), uiWebAppContext, serviceConfig); + webApp = builder.start(new RMWebApp(this), uiWebAppContext, null); } private String getWebAppsPath(String appName) {