commit 6cedc6bb60edc58e0e74e8c950d0609f107a391c Author: Eric Yang Date: Wed Dec 13 17:35:53 2017 -0500 YARN-7605. Added doAs validation for submitting application. (Contributed by Eric Yang) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java index fa447d8..e31e210 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java @@ -865,6 +865,45 @@ public void addInternalServlet(String name, String pathSpec, } /** + * Add an internal servlet in the server, with initialization parameters. + * Note: This method is to be used for adding servlets that facilitate + * internal communication and not for user facing functionality. For + * servlets added using this method, filters (except internal Kerberos + * filters) are not enabled. + * + * @param name The name of the servlet (can be passed as null) + * @param pathSpec The path spec for the servlet + * @param clazz The servlet class + * @param params init parameters + */ + public void addInternalServlet(String name, String pathSpec, + Class clazz, Map params) { + // Jetty doesn't like the same path spec mapping to different servlets, so + // if there's already a mapping for this pathSpec, remove it and assume that + // the newest one is the one we want + final ServletHolder sh = new ServletHolder(ServletContainer.class); + sh.setName(name); + sh.setInitParameters(params); + final ServletMapping[] servletMappings = + webAppContext.getServletHandler().getServletMappings(); + for (int i = 0; i < servletMappings.length; i++) { + if (servletMappings[i].containsPathSpec(pathSpec)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found existing " + servletMappings[i].getServletName() + + " servlet at path " + pathSpec + "; will replace mapping" + + " with " + sh.getName() + " servlet"); + } + ServletMapping[] newServletMappings = + ArrayUtil.removeFromArray(servletMappings, servletMappings[i]); + webAppContext.getServletHandler() + .setServletMappings(newServletMappings); + break; + } + } + webAppContext.addServlet(sh, pathSpec); + } + + /** * Add the given handler to the front of the list of handlers. * * @param handler The handler to add diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index f4133a5..b93ad13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -27,6 +27,7 @@ import javax.ws.rs.core.MediaType; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -79,7 +80,7 @@ private String getRMWebAddress() { String path = "/app/v1/services/version"; String rmAddress = conf .get("yarn.resourcemanager.webapp.address"); - if(conf.getBoolean("hadoop.ssl.enabled", false)) { + if (YarnConfiguration.useHttps(conf)) { scheme = "https://"; rmAddress = conf .get("yarn.resourcemanager.webapp.https.address"); @@ -159,19 +160,23 @@ private ClientConfig getClientConfig() { private int processResponse(ClientResponse response) { response.bufferEntity(); - if (response.getStatus() >= 299) { - String error = ""; - try { - ServiceStatus ss = response.getEntity(ServiceStatus.class); - error = ss.getDiagnostics(); - } catch (Throwable t) { - error = response.getEntity(String.class); - } - LOG.error(error); + String output; + try { + ServiceStatus ss = response.getEntity(ServiceStatus.class); + output = ss.getDiagnostics(); + } catch (Throwable t) { + output = response.getEntity(String.class); + } + if (output==null) { + output = response.getEntity(String.class); + } + if (response.getStatus() <= 299) { + LOG.info(output); + return EXIT_SUCCESS; + } else { + LOG.error(output); return EXIT_EXCEPTION_THROWN; } - LOG.info(response.toString()); - return EXIT_SUCCESS; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 84c3905..99696a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -19,7 +19,11 @@ import com.google.inject.Inject; import com.google.inject.Singleton; + +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -33,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -41,10 +46,15 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; + +import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -72,7 +82,8 @@ public ApiServer(Configuration conf) { private static final Logger LOG = LoggerFactory.getLogger(ApiServer.class); private static Configuration YARN_CONFIG = new YarnConfiguration(); - private static ServiceClient SERVICE_CLIENT; + private ServiceClient serviceClientUnitTest; + private boolean unitTest = false; static { init(); @@ -80,9 +91,6 @@ public ApiServer(Configuration conf) { // initialize all the common resources - order is important private static void init() { - SERVICE_CLIENT = new ServiceClient(); - SERVICE_CLIENT.init(YARN_CONFIG); - SERVICE_CLIENT.start(); } @GET @@ -99,16 +107,34 @@ public Response getVersion() { @Path(SERVICE_ROOT_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response createService(Service service) { + public Response createService(@Context HttpServletRequest request, + Service service) { LOG.info("POST: createService = {}", service); ServiceStatus serviceStatus = new ServiceStatus(); try { + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("POST: createService = {} user = {}", service, ugi); if(service.getState()==ServiceState.STOPPED) { - SERVICE_CLIENT.actionBuild(service); + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + sc.actionBuild(service); } else { - ApplicationId applicationId = SERVICE_CLIENT.actionCreate(service); - LOG.info("Successfully created service " + service.getName() - + " applicationId = " + applicationId); + ApplicationId applicationId = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public ApplicationId run() throws Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + ApplicationId applicationId = sc.actionCreate(service); + sc.close(); + return applicationId; + } + }); + serviceStatus.setDiagnostics("Application ID: " + applicationId); } serviceStatus.setState(ACCEPTED); serviceStatus.setUri( @@ -120,9 +146,9 @@ public Response createService(Service service) { return Response.status(Status.BAD_REQUEST).entity(serviceStatus) .build(); } catch (Exception e) { - String message = "Failed to create service " + service.getName(); + String message = "Failed to create service " + service.getName() + "."; LOG.error(message, e); - serviceStatus.setDiagnostics(message + ": " + e.getMessage()); + serviceStatus.setDiagnostics(message); return Response.status(Status.INTERNAL_SERVER_ERROR) .entity(serviceStatus).build(); } @@ -132,17 +158,35 @@ public Response createService(Service service) { @Path(SERVICE_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response getService(@PathParam(SERVICE_NAME) String appName) { - LOG.info("GET: getService for appName = {}", appName); + public Response getService(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName) { ServiceStatus serviceStatus = new ServiceStatus(); try { - Service app = SERVICE_CLIENT.getStatus(appName); + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("GET: getService for appName = {} user = {}", appName, ugi); + Service app = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Service run() throws Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Service app = sc.getStatus(appName); + sc.close(); + return app; + } + }); return Response.ok(app).build(); - } catch (IllegalArgumentException e) { + } catch (IllegalArgumentException | FileNotFoundException e) { serviceStatus.setDiagnostics(e.getMessage()); serviceStatus.setCode(ERROR_CODE_APP_NAME_INVALID); return Response.status(Status.NOT_FOUND).entity(serviceStatus) .build(); + } catch (AccessControlException e) { + serviceStatus.setDiagnostics(e.getMessage()); + return Response.status(Status.FORBIDDEN).entity(serviceStatus) + .build(); } catch (Exception e) { LOG.error("Get service failed", e); serviceStatus @@ -156,33 +200,73 @@ public Response getService(@PathParam(SERVICE_NAME) String appName) { @Path(SERVICE_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response deleteService(@PathParam(SERVICE_NAME) String appName) { - LOG.info("DELETE: deleteService for appName = {}", appName); - return stopService(appName, true); + public Response deleteService(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName) { + try { + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("DELETE: deleteService for appName = {} user = {}", + appName, ugi); + return stopService(appName, true, ugi); + } catch (IOException e) { + LOG.info("Fail to authenticate: ", e); + ServiceStatus entity = new ServiceStatus(); + entity.setDiagnostics("Failed to authenticate API service with KDC."); + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(entity).build(); + } catch (IllegalArgumentException e) { + ServiceStatus entity = new ServiceStatus(); + entity.setDiagnostics(e.getMessage()); + return Response.status(Status.BAD_REQUEST).entity(entity) + .build(); + } } - private Response stopService(String appName, boolean destroy) { + private Response stopService(String appName, boolean destroy, + final UserGroupInformation ugi) { try { - SERVICE_CLIENT.actionStop(appName, destroy); - if (destroy) { - SERVICE_CLIENT.actionDestroy(appName); - LOG.info("Successfully deleted service {}", appName); + Integer result = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Integer run() throws IOException, YarnException, + ApplicationNotFoundException { + int result = 0; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result = sc.actionStop(appName, destroy); + if (destroy) { + result = sc.actionDestroy(appName); + LOG.info("Successfully deleted service {}", appName); + } else { + LOG.info("Successfully stopped service {}", appName); + } + sc.close(); + return Integer.valueOf(result); + } + }); + if (result.intValue() == 0) { + ServiceStatus serviceStatus = new ServiceStatus(); + serviceStatus.setDiagnostics("Successfully stopped service " + + appName); + return Response.status(Status.OK).entity(serviceStatus).build(); } else { - LOG.info("Successfully stopped service {}", appName); + throw new ApplicationNotFoundException("Service " + appName + + " is not found in YARN."); } - return Response.status(Status.OK).build(); - } catch (ApplicationNotFoundException e) { - ServiceStatus serviceStatus = new ServiceStatus(); - serviceStatus.setDiagnostics( - "Service " + appName + " is not found in YARN: " + e.getMessage()); - return Response.status(Status.BAD_REQUEST).entity(serviceStatus) - .build(); - } catch (Exception e) { + } catch (NullPointerException | IOException e) { LOG.error("Fail to stop service:", e); ServiceStatus serviceStatus = new ServiceStatus(); - serviceStatus.setDiagnostics(e.getMessage()); + serviceStatus.setDiagnostics(ExceptionUtils.getFullStackTrace(e)); return Response.status(Status.INTERNAL_SERVER_ERROR) .entity(serviceStatus).build(); + } catch (InterruptedException | ApplicationNotFoundException | + UndeclaredThrowableException e) { + ServiceStatus serviceStatus = new ServiceStatus(); + serviceStatus.setDiagnostics("Service " + appName + + " is not found in YARN."); + return Response.status(Status.BAD_REQUEST).entity(serviceStatus) + .build(); } } @@ -190,25 +274,41 @@ private Response stopService(String appName, boolean destroy) { @Path(COMPONENT_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN }) - public Response updateComponent(@PathParam(SERVICE_NAME) String appName, + public Response updateComponent(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName, @PathParam(COMPONENT_NAME) String componentName, Component component) { - if (component.getNumberOfContainers() < 0) { - return Response.status(Status.BAD_REQUEST).entity( + ServiceStatus status = new ServiceStatus(); + try { + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + + if (component.getNumberOfContainers() < 0) { + return Response.status(Status.BAD_REQUEST).entity( "Service = " + appName + ", Component = " + component.getName() + ": Invalid number of containers specified " + component .getNumberOfContainers()).build(); - } - ServiceStatus status = new ServiceStatus(); - try { - Map original = SERVICE_CLIENT.flexByRestService(appName, - Collections.singletonMap(component.getName(), - component.getNumberOfContainers())); + } + Map original = ugi.doAs( + new PrivilegedExceptionAction>() { + @Override + public Map run() throws Exception { + ServiceClient sc = new ServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Map original = sc.flexByRestService(appName, + Collections.singletonMap(component.getName(), + component.getNumberOfContainers())); + sc.close(); + return original; + } + }); status.setDiagnostics( "Updating component (" + componentName + ") size from " + original .get(componentName) + " to " + component.getNumberOfContainers()); return Response.ok().entity(status).build(); - } catch (YarnException | IOException e) { + } catch (InterruptedException | IOException e) { status.setDiagnostics(e.getMessage()); return Response.status(Status.INTERNAL_SERVER_ERROR).entity(status) .build(); @@ -219,67 +319,109 @@ public Response updateComponent(@PathParam(SERVICE_NAME) String appName, @Path(SERVICE_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @Produces({ MediaType.APPLICATION_JSON }) - public Response updateService(@PathParam(SERVICE_NAME) String appName, + public Response updateService(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String appName, Service updateServiceData) { - LOG.info("PUT: updateService for app = {} with data = {}", appName, - updateServiceData); - - // Ignore the app name provided in updateServiceData and always use appName - // path param - updateServiceData.setName(appName); + try { + UserGroupInformation proxyUser = UserGroupInformation.getLoginUser(); + UserGroupInformation ugi = UserGroupInformation + .createProxyUser(request.getRemoteUser(), proxyUser); + LOG.info("PUT: updateService for app = {} with data = {} user = {}", + appName, updateServiceData, ugi); + // Ignore the app name provided in updateServiceData and always use + // appName path param + updateServiceData.setName(appName); - if (updateServiceData.getState() == ServiceState.FLEX) { - Map componentCountStrings = new HashMap(); - for (Component c : updateServiceData.getComponents()) { - componentCountStrings.put(c.getName(), c.getNumberOfContainers().toString()); + if (updateServiceData.getState() != null + && updateServiceData.getState() == ServiceState.FLEX) { + return flexService(updateServiceData, ugi); } - ServiceStatus status = new ServiceStatus(); - try { - int result = SERVICE_CLIENT - .actionFlex(appName, componentCountStrings); - if (result == EXIT_SUCCESS) { - LOG.info("Successfully flex service " + appName); - status.setDiagnostics("Service " + appName + - " is successfully flexed."); - status.setState(ServiceState.ACCEPTED); - } - } catch (YarnException | IOException e) { - String message = "Failed to flex service " + appName; - LOG.info(message, e); - status.setDiagnostics(message); - return Response.status(Status.INTERNAL_SERVER_ERROR) - .entity(status).build(); + // For STOP the app should be running. If already stopped then this + // operation will be a no-op. For START it should be in stopped state. + // If already running then this operation will be a no-op. + if (updateServiceData.getState() != null + && updateServiceData.getState() == ServiceState.STOPPED) { + return stopService(appName, false, ugi); } - } - // For STOP the app should be running. If already stopped then this - // operation will be a no-op. For START it should be in stopped state. - // If already running then this operation will be a no-op. - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.STOPPED) { - return stopService(appName, false); - } - // If a START is requested - if (updateServiceData.getState() != null - && updateServiceData.getState() == ServiceState.STARTED) { - return startService(appName); - } + // If a START is requested + if (updateServiceData.getState() != null + && updateServiceData.getState() == ServiceState.STARTED) { + return startService(appName, ugi); + } - // If new lifetime value specified then update it - if (updateServiceData.getLifetime() != null - && updateServiceData.getLifetime() > 0) { - return updateLifetime(appName, updateServiceData); + // If new lifetime value specified then update it + if (updateServiceData.getLifetime() != null + && updateServiceData.getLifetime() > 0) { + return updateLifetime(appName, updateServiceData, ugi); + } + } catch (IOException e) { + LOG.error(ExceptionUtils.getFullStackTrace(e)); + ServiceStatus ss = new ServiceStatus(); + ss.setDiagnostics(ExceptionUtils.getFullStackTrace(e)); + return Response.status(Status.FORBIDDEN).entity(ss).build(); } // If nothing happens consider it a no-op return Response.status(Status.NO_CONTENT).build(); } - private Response updateLifetime(String appName, Service updateAppData) { + private Response flexService(Service service, UserGroupInformation ugi) { + ServiceStatus status = new ServiceStatus(); + String appName = service.getName(); + Response response = Response.status(Status.INTERNAL_SERVER_ERROR).build(); + try { + Map componentCountStrings = new HashMap(); + for (Component c : service.getComponents()) { + componentCountStrings.put(c.getName(), c.getNumberOfContainers().toString()); + } + Integer result = ugi.doAs(new PrivilegedExceptionAction() { + + @Override + public Integer run() throws YarnException, IOException { + int result = 0; + ServiceClient sc = new ServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result = sc + .actionFlex(appName, componentCountStrings); + sc.close(); + return Integer.valueOf(result); + } + }); + if (result == EXIT_SUCCESS) { + String message = "Service " + appName + " is successfully flexed."; + LOG.info(message); + status.setDiagnostics(message); + status.setState(ServiceState.ACCEPTED); + response = Response.status(Status.ACCEPTED).entity(status).build(); + } + } catch (Exception e) { + String message = "Failed to flex service " + appName; + LOG.error(message, e); + status.setDiagnostics(message + "\n" + ExceptionUtils.getStackTrace(e)); + response = Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(status).build(); + } + return response; + } + + private Response updateLifetime(String appName, Service updateAppData, + final UserGroupInformation ugi) { ServiceStatus status = new ServiceStatus(); try { - String newLifeTime = - SERVICE_CLIENT.updateLifetime(appName, updateAppData.getLifetime()); + String newLifeTime = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public String run() throws Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + String newLifeTime = + sc.updateLifetime(appName, updateAppData.getLifetime()); + sc.close(); + return newLifeTime; + } + }); status.setDiagnostics( "Service (" + appName + ")'s lifeTime is updated to " + newLifeTime + ", " + updateAppData.getLifetime() @@ -296,17 +438,29 @@ private Response updateLifetime(String appName, Service updateAppData) { } } - private Response startService(String appName) { + private Response startService(String appName, + final UserGroupInformation ugi) { ServiceStatus status = new ServiceStatus(); try { - SERVICE_CLIENT.actionStart(appName); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws ApplicationNotFoundException, Exception { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + sc.actionStart(appName); + sc.close(); + return null; + } + }); LOG.info("Successfully started service " + appName); status.setDiagnostics("Service " + appName + " is successfully started."); status.setState(ServiceState.ACCEPTED); return Response.ok(status).build(); } catch (Exception e) { String message = "Failed to start service " + appName; - status.setDiagnostics(message + ": " + e.getMessage()); + status.setDiagnostics(message + ": " + + ExceptionUtils.getFullStackTrace(e)); LOG.info(message, e); return Response.status(Status.INTERNAL_SERVER_ERROR) .entity(status).build(); @@ -318,10 +472,16 @@ private Response startService(String appName) { * * @param mockServerClient - A mocked version of ServiceClient */ - public static void setServiceClient(ServiceClient mockServerClient) { - SERVICE_CLIENT = mockServerClient; - SERVICE_CLIENT.init(YARN_CONFIG); - SERVICE_CLIENT.start(); + public void setServiceClient(ServiceClient mockServerClient) { + serviceClientUnitTest = mockServerClient; + unitTest = true; } + private ServiceClient getServiceClient() { + if (unitTest) { + return serviceClientUnitTest; + } else { + return new ServiceClient(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java index 896b2f6..359cb5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java @@ -26,12 +26,15 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.webapp.ApiServer; + +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Path; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.List; @@ -44,15 +47,19 @@ */ public class TestApiServer { private ApiServer apiServer; + private HttpServletRequest request; @Before public void setup() throws Exception { + request = Mockito.mock(HttpServletRequest.class); + Mockito.when(request.getRemoteUser()) + .thenReturn(System.getProperty("user.name")); ServiceClient mockServerClient = new ServiceClientTest(); Configuration conf = new Configuration(); conf.set("yarn.api-service.service.client.class", ServiceClientTest.class.getName()); - ApiServer.setServiceClient(mockServerClient); - this.apiServer = new ApiServer(conf); + apiServer = new ApiServer(conf); + apiServer.setServiceClient(mockServerClient); } @Test @@ -77,7 +84,7 @@ public void testGetVersion() { public void testBadCreateService() { Service service = new Service(); // Test for invalid argument - final Response actual = apiServer.createService(service); + final Response actual = apiServer.createService(request, service); assertEquals("Create service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); } @@ -101,21 +108,21 @@ public void testGoodCreateService() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.createService(service); + final Response actual = apiServer.createService(request, service); assertEquals("Create service is ", actual.getStatus(), Response.status(Status.ACCEPTED).build().getStatus()); } @Test public void testBadGetService() { - final Response actual = apiServer.getService("no-jenkins"); + final Response actual = apiServer.getService(request, "no-jenkins"); assertEquals("Get service is ", actual.getStatus(), Response.status(Status.NOT_FOUND).build().getStatus()); } @Test public void testBadGetService2() { - final Response actual = apiServer.getService(null); + final Response actual = apiServer.getService(request, null); assertEquals("Get service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR) .build().getStatus()); @@ -123,21 +130,21 @@ public void testBadGetService2() { @Test public void testGoodGetService() { - final Response actual = apiServer.getService("jenkins"); + final Response actual = apiServer.getService(request, "jenkins"); assertEquals("Get service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); } @Test public void testBadDeleteService() { - final Response actual = apiServer.deleteService("no-jenkins"); + final Response actual = apiServer.deleteService(request, "no-jenkins"); assertEquals("Delete service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); } @Test public void testBadDeleteService2() { - final Response actual = apiServer.deleteService(null); + final Response actual = apiServer.deleteService(request, null); assertEquals("Delete service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR) .build().getStatus()); @@ -145,7 +152,7 @@ public void testBadDeleteService2() { @Test public void testGoodDeleteService() { - final Response actual = apiServer.deleteService("jenkins"); + final Response actual = apiServer.deleteService(request, "jenkins"); assertEquals("Delete service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); } @@ -170,7 +177,7 @@ public void testDecreaseContainerAndStop() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("update service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -197,7 +204,7 @@ public void testBadDecreaseContainerAndStop() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("flex service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); @@ -223,7 +230,7 @@ public void testIncreaseContainersAndStart() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("flex service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -249,7 +256,7 @@ public void testBadStartServices() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("start service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR).build() @@ -276,7 +283,7 @@ public void testGoodStartServices() { c.setResource(resource); components.add(c); service.setComponents(components); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("start service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -303,7 +310,7 @@ public void testBadStopServices() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("stop service is ", actual.getStatus(), Response.status(Status.BAD_REQUEST).build().getStatus()); @@ -330,7 +337,7 @@ public void testGoodStopServices() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("jenkins", + final Response actual = apiServer.updateService(request, "jenkins", service); assertEquals("stop service is ", actual.getStatus(), Response.status(Status.OK).build().getStatus()); @@ -357,7 +364,7 @@ public void testUpdateService() { components.add(c); service.setComponents(components); System.out.println("before stop"); - final Response actual = apiServer.updateService("no-jenkins", + final Response actual = apiServer.updateService(request, "no-jenkins", service); assertEquals("update service is ", actual.getStatus(), Response.status(Status.INTERNAL_SERVER_ERROR) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 81c56d2..2697499 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.client; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; @@ -186,7 +187,8 @@ public Service loadAppJsonFromLocalFS(String fileName, String serviceName, } public int actionSave(String fileName, String serviceName, Long lifetime, - String queue) throws IOException, YarnException { + String queue) + throws IOException, YarnException { return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName, lifetime, queue)); } @@ -216,7 +218,12 @@ public ApplicationId actionCreate(Service service) // Write the definition first and then submit - AM will read the definition createDirAndPersistApp(appDir, service); - ApplicationId appId = submitApp(service); + ApplicationId appId; + try { + appId = submitApp(service); + } catch (InterruptedException e) { + throw new YarnException(e); + } cachedAppInfo.put(serviceName, new AppInfo(appId, service .getKerberosPrincipal().getPrincipalName())); service.setId(appId.toString()); @@ -296,6 +303,7 @@ private long parseNumberOfContainers(Component component, String newNumber) { private Map flexComponents(String serviceName, Map componentCounts, Service persistedService) throws YarnException, IOException { + ServiceApiUtil.validateNameFormat(serviceName, getConfig()); Map original = new HashMap<>(componentCounts.size()); @@ -316,6 +324,7 @@ private long parseNumberOfContainers(Component component, String newNumber) { requestBuilder.addComponents(countBuilder.build()); } } + if (original.size() < componentCounts.size()) { componentCounts.keySet().removeAll(original.keySet()); throw new YarnException("Components " + componentCounts.keySet() @@ -334,17 +343,26 @@ private long parseNumberOfContainers(Component component, String newNumber) { LOG.error(message); throw new YarnException(message); } + if (StringUtils.isEmpty(appReport.getHost())) { throw new YarnException(serviceName + " AM hostname is empty"); } ClientAMProtocol proxy = createAMProxy(serviceName, appReport); - proxy.flexComponents(requestBuilder.build()); + try { + proxy.flexComponents(requestBuilder.build()); + } catch (YarnException | IOException e) { + LOG.warn("Exception caught during flex operation."); + LOG.warn(ExceptionUtils.getFullStackTrace(e)); + throw new YarnException("Permission denied to perform flex operation."); + } + for (Map.Entry entry : original.entrySet()) { LOG.info("[COMPONENT {}]: number of containers changed from {} to {}", entry.getKey(), entry.getValue(), componentCounts.get(entry.getKey())); } + return original; } @@ -530,7 +548,7 @@ private void verifyNoLiveAppInRM(String serviceName, String action) } private ApplicationId submitApp(Service app) - throws IOException, YarnException { + throws IOException, YarnException, InterruptedException { String serviceName = app.getName(); Configuration conf = getConfig(); Path appRootDir = fs.buildClusterDirPath(app.getName()); @@ -550,7 +568,7 @@ private ApplicationId submitApp(Service app) } submissionContext.setMaxAppAttempts(YarnServiceConf .getInt(YarnServiceConf.AM_RESTART_MAX, 20, app.getConfiguration(), - conf)); + conf)); setLogAggregationContext(app, conf, submissionContext); @@ -564,7 +582,7 @@ private ApplicationId submitApp(Service app) // add keytab if in secure env addKeytabResourceIfSecure(fs, localResources, app); if (LOG.isDebugEnabled()) { - printLocalResources(localResources); + printLocalResources(localResources); } Map env = addAMEnv(); @@ -572,11 +590,11 @@ private ApplicationId submitApp(Service app) String cmdStr = buildCommandLine(app, conf, appRootDir, hasAMLog4j); submissionContext.setResource(Resource.newInstance(YarnServiceConf .getLong(YarnServiceConf.AM_RESOURCE_MEM, - YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM, app.getConfiguration(), - conf), 1)); + YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM, app.getConfiguration(), + conf), 1)); String queue = app.getQueue(); if (StringUtils.isEmpty(queue)) { - queue = conf.get(YARN_QUEUE, "default"); + queue = conf.get(YARN_QUEUE, "default"); } submissionContext.setQueue(queue); submissionContext.setApplicationName(serviceName); @@ -584,7 +602,7 @@ private ApplicationId submitApp(Service app) Set appTags = AbstractClientProvider.createApplicationTags(serviceName, null, null); if (!appTags.isEmpty()) { - submissionContext.setApplicationTags(appTags); + submissionContext.setApplicationTags(appTags); } ContainerLaunchContext amLaunchContext = Records.newRecord(ContainerLaunchContext.class); @@ -593,7 +611,7 @@ private ApplicationId submitApp(Service app) amLaunchContext.setLocalResources(localResources); addHdfsDelegationTokenIfSecure(amLaunchContext); submissionContext.setAMContainerSpec(amLaunchContext); - yarnClient.submitApplication(submissionContext); + yarnClient.submitApplication(submissionContext); return submissionContext.getApplicationId(); } @@ -738,14 +756,20 @@ private boolean addAMLog4jResource(String serviceName, Configuration conf, return hasAMLog4j; } - public int actionStart(String serviceName) throws YarnException, IOException { + public int actionStart(String serviceName) + throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); Path appDir = checkAppExistOnHdfs(serviceName); Service service = ServiceApiUtil.loadService(fs, serviceName); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); // see if it is actually running and bail out; verifyNoLiveAppInRM(serviceName, "thaw"); - ApplicationId appId = submitApp(service); + ApplicationId appId; + try { + appId = submitApp(service); + } catch (InterruptedException e) { + throw new YarnException(e); + } service.setId(appId.toString()); // write app definition on to hdfs Path appJson = persistAppDef(appDir, service); @@ -988,8 +1012,10 @@ protected ClientAMProtocol createAMProxy(String serviceName, if (UserGroupInformation.isSecurityEnabled()) { if (!cachedAppInfo.containsKey(serviceName)) { - Service persistedService = ServiceApiUtil.loadService(fs, serviceName); - cachedAppInfo.put(serviceName, new AppInfo(appReport.getApplicationId(), + Service persistedService = ServiceApiUtil + .loadService(fs, serviceName); + cachedAppInfo.put(serviceName, + new AppInfo(appReport.getApplicationId(), persistedService.getKerberosPrincipal().getPrincipalName())); } String principalName = cachedAppInfo.get(serviceName).principalName; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 1c517d9..7b77b2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -271,10 +271,6 @@ private void checkCompInstancesInOrder(ServiceClient client, } } - private void checkRegistryAndCompDirDeleted() { - - } - private void checkEachCompInstancesInOrder(Component component) { long expectedNumInstances = component.getNumberOfContainers(); Assert.assertEquals(expectedNumInstances, component.getContainers().size()); @@ -290,32 +286,6 @@ private void checkEachCompInstancesInOrder(Component component) { } } - private void waitForOneCompToBeReady(ServiceClient client, - Service exampleApp, String readyComp) - throws TimeoutException, InterruptedException { - long numExpectedContainers = - exampleApp.getComponent(readyComp).getNumberOfContainers(); - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - Component retrievedComp = retrievedApp.getComponent(readyComp); - - if (retrievedComp.getContainers() != null - && retrievedComp.getContainers().size() == numExpectedContainers) { - LOG.info(readyComp + " found " + numExpectedContainers - + " containers running"); - return true; - } else { - LOG.info(" Waiting for " + readyComp + "'s containers to be running"); - return false; - } - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, 200000); - } - /** * Wait until all the containers for all components become ready state. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java index d3ad53e..92b6fc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java @@ -82,6 +82,7 @@ public Class clazz; public String name; public String spec; + public Map params; } final String name; @@ -147,7 +148,19 @@ servlets.add(struct); return this; } - + + public Builder withServlet(String name, String pathSpec, + Class servlet, + Map params) { + ServletStruct struct = new ServletStruct(); + struct.clazz = servlet; + struct.name = name; + struct.spec = pathSpec; + struct.params = params; + servlets.add(struct); + return this; + } + public Builder with(Configuration conf) { this.conf = conf; return this; @@ -243,6 +256,11 @@ public void setup() { pathList.add("/" + wsName + "/*"); } } + for (ServletStruct s : servlets) { + if (!pathList.contains(s.spec)) { + pathList.add(s.spec); + } + } if (conf == null) { conf = new Configuration(); } @@ -315,7 +333,12 @@ public void setup() { HttpServer2 server = builder.build(); for(ServletStruct struct: servlets) { - server.addServlet(struct.name, struct.spec, struct.clazz); + if (struct.params != null) { + server.addInternalServlet(struct.name, struct.spec, + struct.clazz, struct.params); + } else { + server.addServlet(struct.name, struct.spec, struct.clazz); + } } for(Map.Entry entry : attributes.entrySet()) { server.setAttribute(entry.getKey(), entry.getValue()); @@ -405,11 +428,6 @@ public WebApp start(WebApp webapp, WebAppContext ui2Context, addFiltersForNewContext(ui2Context); httpServer.addHandlerAtFront(ui2Context); } - if (services!=null) { - String packageName = services.get("PackageName"); - String pathSpec = services.get("PathSpec"); - httpServer.addJerseyResourcePackage(packageName, pathSpec); - } try { httpServer.start(); LOG.info("Web app " + name + " started at " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a0317f6..0379220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.spi.container.servlet.ServletContainer; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.AuthInfo; @@ -1041,11 +1043,23 @@ protected void startWepApp() { RMWebAppUtil.setupSecurityAndFilters(conf, getClientRMService().rmDTSecretManager); + Map params = new HashMap(); + if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, + false)) { + String apiPackages = "org.apache.hadoop.yarn.service.webapp;" + + "org.apache.hadoop.yarn.webapp"; + params.put("com.sun.jersey.config.property.resourceConfigClass", + "com.sun.jersey.api.core.PackagesResourceConfig"); + params.put("com.sun.jersey.config.property.packages", apiPackages); + } + Builder builder = WebApps .$for("cluster", ApplicationMasterService.class, masterService, "ws") .with(conf) + .withServlet("API-Service", "/app/*", + ServletContainer.class, params) .withHttpSpnegoPrincipalKey( YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) .withHttpSpnegoKeytabKey( @@ -1101,15 +1115,7 @@ protected void startWepApp() { } } - if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, - false)) { - serviceConfig = new HashMap(); - String apiPackages = "org.apache.hadoop.yarn.service.webapp;" + - "org.apache.hadoop.yarn.webapp"; - serviceConfig.put("PackageName", apiPackages); - serviceConfig.put("PathSpec", "/app/*"); - } - webApp = builder.start(new RMWebApp(this), uiWebAppContext, serviceConfig); + webApp = builder.start(new RMWebApp(this), uiWebAppContext, null); } private String getWebAppsPath(String appName) {