diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index e4a245d3dc3..6826776aa5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -26,6 +26,7 @@ import javax.ws.rs.core.MediaType; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -40,11 +41,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; +import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.util.RMHAUtils; +import org.codehaus.jackson.map.PropertyNamingStrategy; import org.eclipse.jetty.util.UrlEncoded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +136,7 @@ private String getRMWebAddress() { * @return URI to API Service * @throws IOException */ - private String getApiUrl(String appName) throws IOException { + private String getServicePath(String appName) throws IOException { String url = getRMWebAddress(); StringBuilder api = new StringBuilder(); api.append(url); @@ -148,23 +153,42 @@ private String getApiUrl(String appName) throws IOException { return api.toString(); } + private String getContainerPath(String appName, String compName, String + compInstanceName) throws IOException { + Preconditions.checkNotNull(appName); + Preconditions.checkNotNull(compName); + Preconditions.checkNotNull(compInstanceName); + String url = getRMWebAddress(); + StringBuilder api = new StringBuilder(); + api.append(url); + api.append("/app/v1/services/").append(appName).append("/") + .append(compName).append("/").append(compInstanceName); + Configuration conf = getConfig(); + if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) { + api.append("?user.name=" + UrlEncoded + .encodeString(System.getProperty("user.name"))); + } + return api.toString(); + } + private Builder getApiClient() throws IOException { - return getApiClient(null); + return getApiClient(getServicePath(null)); } /** * Setup API service web request. * - * @param appName + * @param requestPath * @return * @throws IOException */ - private Builder getApiClient(String appName) throws IOException { + private Builder getApiClient(String requestPath) + throws IOException { Client client = Client.create(getClientConfig()); Configuration conf = getConfig(); client.setChunkedEncodingSize(null); Builder builder = client - .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON); + .resource(requestPath).type(MediaType.APPLICATION_JSON); if (conf.get("hadoop.http.authentication.type").equals("kerberos")) { AuthenticatedURL.Token token = new AuthenticatedURL.Token(); builder.header("WWW-Authenticate", token); @@ -312,7 +336,7 @@ public int actionStop(String appName) throws IOException, YarnException { service.setName(appName); service.setState(ServiceState.STOPPED); String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -335,7 +359,7 @@ public int actionStart(String appName) throws IOException, YarnException { service.setName(appName); service.setState(ServiceState.STARTED); String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -381,7 +405,7 @@ public int actionSave(String fileName, String appName, Long lifetime, public int actionDestroy(String appName) throws IOException, YarnException { int result = EXIT_SUCCESS; try { - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .delete(ClientResponse.class); result = processResponse(response); } catch (Exception e) { @@ -413,7 +437,7 @@ public int actionFlex(String appName, Map componentCounts) service.addComponent(component); } String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -454,7 +478,8 @@ public String getStatusString(String appIdOrName) throws IOException, ServiceApiUtil.validateNameFormat(appName, getConfig()); } try { - ClientResponse response = getApiClient(appName).get(ClientResponse.class); + ClientResponse response = getApiClient(getServicePath(appName)) + .get(ClientResponse.class); if (response.getStatus() != 200) { StringBuilder sb = new StringBuilder(); sb.append(appName); @@ -487,4 +512,28 @@ public int actionUpgrade(String appName, } return result; } + + @Override + public int actionUpgrade(String appName, String compName, + String compInstanceName) throws IOException, YarnException { + int result; + try { + Container container = new Container(); + container.setComponentInstanceName(compInstanceName); + container.setState(ContainerState.UPGRADING); + + String buffer = containerJsonSerde.toJson(container); + ClientResponse response = getApiClient(getContainerPath(appName, compName, + compInstanceName)).post(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade component instance: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + + private static JsonSerDeser containerJsonSerde = + new JsonSerDeser<>(Container.class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 59ee05d7ab2..f70d11a1cb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -28,10 +28,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.RestApiConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +57,10 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; @@ -177,17 +183,7 @@ public Response getService(@Context HttpServletRequest request, } UserGroupInformation ugi = getProxyUser(request); LOG.info("GET: getService for appName = {} user = {}", appName, ugi); - Service app = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Service run() throws IOException, YarnException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Service app = sc.getStatus(appName); - sc.close(); - return app; - } - }); + Service app = getServiceFromClient(ugi, appName); return Response.ok(app).build(); } catch (AccessControlException e) { return formatResponse(Status.FORBIDDEN, e.getMessage()); @@ -377,8 +373,10 @@ public Response updateService(@Context HttpServletRequest request, } // If an UPGRADE is requested - if (updateServiceData.getState() != null && - updateServiceData.getState() == ServiceState.UPGRADING) { + if (updateServiceData.getState() != null && ( + updateServiceData.getState() == ServiceState.UPGRADING || + updateServiceData.getState() == + ServiceState.UPGRADING_AUTO_FINALIZE)) { return upgradeService(updateServiceData, ugi); } } catch (UndeclaredThrowableException e) { @@ -404,6 +402,40 @@ public Response updateService(@Context HttpServletRequest request, return Response.status(Status.NO_CONTENT).build(); } + @PUT + @Path(COMP_INSTANCE_LONG_PATH) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) + public Response updateComponentInstance(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + @PathParam(COMPONENT_NAME) String componentName, + @PathParam(COMP_INSTANCE_NAME) String componentInstanceName, + Container container) { + + try { + UserGroupInformation ugi = getProxyUser(request); + LOG.info("PUT: update component instance {} for component = {}" + + " service = {} user = {}", componentInstanceName, componentName, + serviceName, ugi); + if (container.getState() != null + && container.getState() == ContainerState.UPGRADING) { + return processContainerUpgrade(ugi, serviceName, componentName, + componentInstanceName, container); + } + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (YarnException e) { + return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + return Response.status(Status.NO_CONTENT).build(); + } + private Response flexService(Service service, UserGroupInformation ugi) throws IOException, InterruptedException { String appName = service.getName(); @@ -492,13 +524,89 @@ private Response upgradeService(Service service, sc.close(); return null; }); - LOG.info("Service {} version {} upgrade initialized"); + LOG.info("Service {} version {} upgrade initialized", service.getName(), + service.getVersion()); status.setDiagnostics("Service " + service.getName() + " version " + service.getVersion() + " saved."); status.setState(ServiceState.ACCEPTED); return formatResponse(Status.ACCEPTED, status); } + private Response processContainerUpgrade(UserGroupInformation ugi, + String serviceName, String componentName, String compInstanceName, + Container reqContainer) throws YarnException, IOException, + InterruptedException { + + if (reqContainer == null) { + throw new YarnException("No container data provided."); + } + if (reqContainer.getComponentInstanceName() != null && + !reqContainer.getComponentInstanceName().equals(compInstanceName)) { + throw new YarnException(String.format("Component instance name in the " + + "request object (%s) does not match that in the URI path (%s).", + reqContainer.getComponentInstanceName(), compInstanceName + )); + } + Service service = getServiceFromClient(ugi, serviceName); + if (service.getState() != ServiceState.UPGRADING) { + throw new YarnException( + String.format("The upgrade of service %s has not been initiated.", + serviceName)); + } + + Component component = service.getComponent(componentName); + if (component == null) { + throw new YarnException(String.format( + "The component name in the URI path (%s) is invalid.", + componentName)); + } + Container container = component.getComponentInstance(compInstanceName); + if (container == null) { + throw new YarnException(String.format( + "The component (%s) does not have a component instance (%s).", + componentName, compInstanceName)); + } + if (component.getState() != ComponentState.NEEDS_UPGRADE) { + // Nothing to upgrade + throw new YarnException(String.format( + "The component instance (%s) does not need an upgrade.", + compInstanceName)); + } + List containersToUpgrade = new ArrayList<>(); + containersToUpgrade.add(container); + Integer result = ugi.doAs((PrivilegedExceptionAction) () -> { + int result1; + ServiceClient sc = new ServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result1 = sc.actionUpgrade(service, containersToUpgrade); + sc.close(); + return result1; + }); + + if (result == EXIT_SUCCESS) { + ServiceStatus status = new ServiceStatus(); + status.setDiagnostics( + "Upgrading component instance (" + compInstanceName + ")"); + return formatResponse(Response.Status.ACCEPTED, status); + } + // If result is not a success, consider it a no-op + return Response.status(Response.Status.NO_CONTENT).build(); + } + + private Service getServiceFromClient(UserGroupInformation ugi, + String serviceName) throws IOException, InterruptedException { + + return ugi.doAs((PrivilegedExceptionAction) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Service app1 = sc.getStatus(serviceName); + sc.close(); + return app1; + }); + } + /** * Used by negative test case. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 4422451c5c2..45ff98ac57d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; @@ -49,4 +51,8 @@ UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request) RestartServiceResponseProto restart(RestartServiceRequestProto request) throws IOException, YarnException; + + CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) throws IOException, + YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 08c36f443ed..3384af8c054 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -26,8 +26,11 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; @@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,8 +155,11 @@ public InetSocketAddress getBindAddress() { @Override public UpgradeServiceResponseProto upgrade( UpgradeServiceRequestProto request) throws IOException { - ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE); - event.setVersion(request.getVersion()); + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) + .setVersion(request.getVersion()); + if (request.getAutoFinalize()) { + event.setAutoFinalize(true); + } context.scheduler.getDispatcher().getEventHandler().handle(event); LOG.info("Upgrading service to version {} by {}", request.getVersion(), UserGroupInformation.getCurrentUser()); @@ -166,4 +174,21 @@ public RestartServiceResponseProto restart(RestartServiceRequestProto request) LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser()); return RestartServiceResponseProto.newBuilder().build(); } + + @Override + public CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) + throws IOException, YarnException { + if (!request.getContainerIdsList().isEmpty()) { + + for (String containerId : request.getContainerIdsList()) { + ComponentInstanceEvent event = + new ComponentInstanceEvent(ContainerId.fromString(containerId), + ComponentInstanceEventType.UPGRADE); + LOG.info("Upgrade container {}", containerId); + context.scheduler.getDispatcher().getEventHandler().handle(event); + } + } + return CompInstancesUpgradeResponseProto.newBuilder().build(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 9e7d442b9e8..0196be2a989 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -28,6 +28,7 @@ private final ServiceEventType type; private String version; + private boolean autoFinalize; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -46,4 +47,13 @@ public ServiceEvent setVersion(String version) { this.version = version; return this; } + + public boolean isAutoFinalize() { + return autoFinalize; + } + + public ServiceEvent setAutoFinalize(boolean autoFinalize) { + this.autoFinalize = autoFinalize; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java index 2162eb5e93d..4fc420ba6dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -24,5 +24,5 @@ public enum ServiceEventType { START, UPGRADE, - STOP_UPGRADE + CHECK_STABLE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index a3fbe899bb0..56940721af1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.ComponentEvent; @@ -41,9 +43,6 @@ import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; -/** - * Manages the state of the service. - */ public class ServiceManager implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger( ServiceManager.class); @@ -72,9 +71,16 @@ State.UPGRADING), ServiceEventType.UPGRADE, new StartUpgradeTransition()) + .addTransition(State.STABLE, EnumSet.of(State.STABLE), + ServiceEventType.CHECK_STABLE, new CheckStableTransition()) + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.START, - new StopUpgradeTransition()) + new CheckStableTransition()) + + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.CHECK_STABLE, + new CheckStableTransition()) .installTopology(); public ServiceManager(ServiceContext context) { @@ -102,7 +108,7 @@ public void handle(ServiceEvent event) { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { LOG.error(MessageFormat.format( - "[SERVICE]: Invalid event {0} at {1}.", event.getType(), + "[SERVICE]: Invalid event {1} at {2}.", event.getType(), oldState), e); } if (oldState != getState()) { @@ -133,7 +139,12 @@ public State transition(ServiceManager serviceManager, Service targetSpec = ServiceApiUtil.loadServiceUpgrade( serviceManager.fs, serviceManager.getName(), event.getVersion()); - serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + if (!event.isAutoFinalize()) { + serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + } else { + serviceManager.serviceSpec.setState( + ServiceState.UPGRADING_AUTO_FINALIZE); + } List compsThatNeedUpgrade = serviceManager.componentsFinder. findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec); @@ -141,8 +152,9 @@ public State transition(ServiceManager serviceManager, if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { compsThatNeedUpgrade.forEach(component -> { ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE). - setTargetSpec(component); + component.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(component) + .setUpgradeVersion(event.getVersion()); serviceManager.dispatcher.getEventHandler().handle( needUpgradeEvent); }); @@ -157,22 +169,29 @@ public State transition(ServiceManager serviceManager, } } - private static class StopUpgradeTransition implements + private static class CheckStableTransition implements MultipleArcTransition { @Override public State transition(ServiceManager serviceManager, ServiceEvent event) { - //abort is not supported currently - //trigger re-check of service state - ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler, - true); - if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) { - return serviceManager.finalizeUpgrade() ? State.STABLE : - State.UPGRADING; - } else { - return State.UPGRADING; + //trigger check of service state + ServiceState currState = serviceManager.serviceSpec.getState(); + if (currState.equals(ServiceState.STABLE)) { + return State.STABLE; } + if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || + event.getType().equals(ServiceEventType.START)) { + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); + if (targetState.equals(ServiceState.STABLE)) { + if (serviceManager.finalizeUpgrade()) { + LOG.info("Service def state changed from {} -> {}", currState, + serviceManager.serviceSpec.getState()); + return State.STABLE; + } + } + } + return State.UPGRADING; } } @@ -181,10 +200,7 @@ public State transition(ServiceManager serviceManager, */ private boolean finalizeUpgrade() { try { - Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade( - fs, getName(), upgradeVersion); - ServiceApiUtil.writeAppDefinition(fs, - ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec); + ServiceApiUtil.overwriteUpgradeDefinition(fs, getName(), upgradeVersion); } catch (IOException e) { LOG.error("Upgrade did not complete because unable to overwrite the" + " service definition", e); @@ -195,13 +211,26 @@ private boolean finalizeUpgrade() { fs.deleteClusterUpgradeDir(getName(), upgradeVersion); } catch (IOException e) { LOG.warn("Unable to delete upgrade definition for service {} " + - "version {}", getName(), upgradeVersion); + "version {}", getName(), upgradeVersion); } + serviceSpec.setState(ServiceState.STABLE); serviceSpec.setVersion(upgradeVersion); upgradeVersion = null; return true; } + private static ServiceState checkIfStable(Service service) { + // if desired == running + for (org.apache.hadoop.yarn.service.api.records.Component comp : + service.getComponents()) { + if (!comp.getState().equals( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) { + return service.getState(); + } + } + return ServiceState.STABLE; + } + /** * Returns the name of the service. */ @@ -216,7 +245,6 @@ public String getName() { STABLE, UPGRADING } - @VisibleForTesting Service getServiceSpec() { return serviceSpec; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 79eef49e0c6..6dc9f6e8c0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -721,6 +721,18 @@ public void onStartContainerError(ContainerId containerId, Throwable t) { // automatically which will trigger stopping COMPONENT INSTANCE } + @Override + public void onContainerReInitialize(ContainerId containerId) { + ComponentInstance instance = liveInstances.get(containerId); + if (instance == null) { + LOG.error("No component instance exists for " + containerId); + return; + } + ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent( + containerId, ComponentInstanceEventType.BECOME_READY); + dispatcher.getEventHandler().handle(becomeReadyEvent); + } + @Override public void onContainerResourceIncreased(ContainerId containerId, Resource resource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index ce0e0cfde8c..5d489b0cb81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -250,6 +250,15 @@ public Container getContainer(String id) { return null; } + public Container getComponentInstance(String compInstanceName) { + for (Container container : containers) { + if (compInstanceName.equals(container.getComponentInstanceName())) { + return container; + } + } + return null; + } + /** * Run all containers of this component in privileged mode (YARN-4262). **/ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index bf09ff2442f..6e390737e70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -26,5 +26,5 @@ @InterfaceAudience.Public @InterfaceStability.Unstable public enum ContainerState { - RUNNING_BUT_UNREADY, READY, STOPPED + RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 286eaa24972..b6ae38bdeee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,6 @@ @ApiModel(description = "The current state of an service.") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { - ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING; + ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, + UPGRADING_AUTO_FINALIZE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 04ca9437426..6dd9c7a513c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.client; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; @@ -230,6 +233,15 @@ public int actionUpgrade(Service service) throws YarnException, IOException { throw new YarnException(message); } + Service liveService = getStatus(service.getName()); + if (liveService.getState().equals(ServiceState.FLEX)) { + String message = service.getName() + " is at " + + ServiceState.FLEX + + " state, upgrade can not be invoked when service is flexing."; + LOG.error(message); + throw new YarnException(message); + } + Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); @@ -244,11 +256,52 @@ public int actionUpgrade(Service service) throws YarnException, IOException { UpgradeServiceRequestProto.Builder requestBuilder = UpgradeServiceRequestProto.newBuilder(); requestBuilder.setVersion(service.getVersion()); + if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + requestBuilder.setAutoFinalize(true); + } proxy.upgrade(requestBuilder.build()); return EXIT_SUCCESS; } + @Override + public int actionUpgrade(String appName, String compName, + String compInstanceName) throws IOException, YarnException { + checkAppExistOnHdfs(appName); + Service persistedService = ServiceApiUtil.loadService(fs, appName); + Container container = persistedService.getComponent(compName) + .getComponentInstance(compInstanceName); + return actionUpgrade(persistedService, Lists.newArrayList(container)); + } + + public int actionUpgrade(Service service, List compInstances) + throws IOException, YarnException { + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(service.getName())); + + if (appReport.getYarnApplicationState() != RUNNING) { + String message = service.getName() + " is at " + + appReport.getYarnApplicationState() + + " state, upgrade can only be invoked when service is running."; + LOG.error(message); + throw new YarnException(message); + } + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(service.getName() + " AM hostname is empty."); + } + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + + List containerIdsToUpgrade = new ArrayList<>(); + compInstances.forEach(compInst -> + containerIdsToUpgrade.add(compInst.getId())); + LOG.info("instances to upgrade {}", containerIdsToUpgrade); + CompInstancesUpgradeRequestProto.Builder upgradeRequestBuilder = + CompInstancesUpgradeRequestProto.newBuilder(); + upgradeRequestBuilder.addAllContainerIds(containerIdsToUpgrade); + proxy.upgrade(upgradeRequestBuilder.build()); + return EXIT_SUCCESS; + } + public int actionLaunch(String fileName, String serviceName, Long lifetime, String queue) throws IOException, YarnException { actionCreate(loadAppJsonFromLocalFS(fileName, serviceName, lifetime, @@ -384,6 +437,17 @@ private long parseNumberOfContainers(Component component, String newNumber) { LOG.error(message); throw new YarnException(message); } + + Service liveService = getStatus(serviceName); + if (liveService.getState().equals(ServiceState.UPGRADING) || + liveService.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + String message = serviceName + " is at " + + liveService.getState() + + " state, flex can not be invoked when service is upgrading. "; + LOG.error(message); + throw new YarnException(message); + } + if (StringUtils.isEmpty(appReport.getHost())) { throw new YarnException(serviceName + " AM hostname is empty"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 0cd7e2c2084..50301781308 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.ServiceEvent; +import org.apache.hadoop.yarn.service.ServiceEventType; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; @@ -36,6 +39,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.ServiceMetrics; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -96,6 +100,9 @@ // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + private ComponentEvent upgradeEvent; + private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0); + private StateMachine stateMachine; private AsyncDispatcher dispatcher; @@ -140,10 +147,15 @@ FLEX, new FlexComponentTransition()) .addTransition(STABLE, UPGRADING, UPGRADE, new ComponentNeedsUpgradeTransition()) - .addTransition(FLEXING, UPGRADING, UPGRADE, - new ComponentNeedsUpgradeTransition()) + //Upgrade while previous upgrade is still in progress .addTransition(UPGRADING, UPGRADING, UPGRADE, new ComponentNeedsUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE), + CHECK_STABLE, new CheckStableTransition()) + .addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), + CHECK_STABLE, new CheckStableTransition()) + .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE, + new CheckStableTransition()) .installTopology(); public Component( @@ -271,7 +283,10 @@ public void transition(Component component, ComponentEvent event) { component.pendingInstances.remove(instance); instance.setContainer(container); - ProviderUtils.initCompInstanceDir(component.getContext().fs, instance); + + ProviderUtils.initCompInstanceDir(component.getContext().fs, + component.createLaunchContext(component.componentSpec, + component.scheduler.getApp().getVersion()), instance); component.getScheduler().addLiveCompInstance(container.getId(), instance); LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " + "host {}, num pending component instances reduced to {} ", @@ -301,6 +316,9 @@ private static ComponentState checkIfStable(Component component) { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; + } else if (component.componentSpec.getState().equals(org.apache.hadoop. + yarn.service.api.records.ComponentState.NEEDS_UPGRADE)) { + return UPGRADING; } else { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); @@ -346,6 +364,9 @@ public static synchronized void checkAndUpdateComponentState( isIncrement); } } + // the state of component needs to transition to be stable + component.dispatcher.getEventHandler().handle(new ComponentEvent( + component.getName(), ComponentEventType.CHECK_STABLE)); } private static class ContainerCompletedTransition extends BaseTransition { @@ -366,6 +387,40 @@ public void transition(Component component, ComponentEvent event) { public void transition(Component component, ComponentEvent event) { component.componentSpec.setState(org.apache.hadoop.yarn.service.api. records.ComponentState.NEEDS_UPGRADE); + component.numContainersThatNeedUpgrade.set(component.componentSpec.getNumberOfContainers()); + component.componentSpec.getContainers().forEach(container -> + container.setState(ContainerState.NEEDS_UPGRADE)); + component.upgradeEvent = event; + } + } + + private static class CheckStableTransition implements MultipleArcTransition + { + + @Override + public ComponentState transition(Component component, + ComponentEvent componentEvent) { + org.apache.hadoop.yarn.service.api.records.ComponentState currState = + component.componentSpec.getState(); + if (currState.equals(org.apache.hadoop.yarn.service.api.records + .ComponentState.STABLE)) { + return ComponentState.STABLE; + } + if (component.numContainersThatNeedUpgrade.get() != 0) { + return ComponentState.UPGRADING; + } + // checkIfStable also updates the state in definition when STABLE + ComponentState targetState = checkIfStable(component); + if (targetState.equals(STABLE) && currState.equals( + org.apache.hadoop.yarn.service.api.records + .ComponentState.NEEDS_UPGRADE)) { + component.componentSpec = component.upgradeEvent.getTargetSpec(); + component.upgradeEvent = null; + ServiceEvent checkStable = new ServiceEvent(ServiceEventType. + CHECK_STABLE); + component.dispatcher.getEventHandler().handle(checkStable); + } + return targetState; } } @@ -401,8 +456,28 @@ private void assignContainerToCompInstance(Container container) { "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId()); - scheduler.getContainerLaunchService() - .launchCompInstance(scheduler.getApp(), instance, container); + if (getState().equals(ComponentState.UPGRADING)) { + scheduler.getContainerLaunchService() + .launchCompInstance(scheduler.getApp(), instance, container, + createLaunchContext(upgradeEvent.getTargetSpec(), + upgradeEvent.getUpgradeVersion())); + } else { + scheduler.getContainerLaunchService().launchCompInstance( + scheduler.getApp(), instance, container, + createLaunchContext(componentSpec, scheduler.getApp().getVersion())); + } + } + + public ContainerLaunchService.ComponentLaunchContext createLaunchContext( + org.apache.hadoop.yarn.service.api.records.Component compSpec, + String version) { + ContainerLaunchService.ComponentLaunchContext launchContext = + new ContainerLaunchService.ComponentLaunchContext(compSpec.getName(), + version); + launchContext.setArtifact(compSpec.getArtifact()) + .setConfiguration(compSpec.getConfiguration()) + .setLaunchCommand(compSpec.getLaunchCommand()); + return launchContext; } @SuppressWarnings({ "unchecked" }) @@ -553,16 +628,24 @@ public void decRunningContainers() { scheduler.getServiceMetrics().containersRunning.decr(); } - public void incContainersReady() { + public void incContainersReady(boolean updateDefinition) { componentMetrics.containersReady.incr(); scheduler.getServiceMetrics().containersReady.incr(); - checkAndUpdateComponentState(this, true); + if (updateDefinition) { + checkAndUpdateComponentState(this, true); + } } - public void decContainersReady() { + public void decContainersReady(boolean updateDefinition) { componentMetrics.containersReady.decr(); scheduler.getServiceMetrics().containersReady.decr(); - checkAndUpdateComponentState(this, false); + if (updateDefinition) { + checkAndUpdateComponentState(this, false); + } + } + + public void decContainersThatNeedUpgrade() { + numContainersThatNeedUpgrade.decrementAndGet(); } public int getNumReadyInstances() { @@ -621,6 +704,16 @@ public ComponentState getState() { this.readLock.unlock(); } } + + public ComponentEvent getUpgradeEvent() { + this.readLock.lock(); + try { + return upgradeEvent; + } finally { + this.readLock.unlock(); + } + } + public ServiceScheduler getScheduler() { return scheduler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 7bd5cb9399f..84caa77b205 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -34,6 +34,7 @@ private ContainerStatus status; private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; + private String upgradeVersion; public ContainerId getContainerId() { return containerId; @@ -103,4 +104,13 @@ public ComponentEvent setTargetSpec( this.targetSpec = Preconditions.checkNotNull(targetSpec); return this; } + + public String getUpgradeVersion() { + return upgradeVersion; + } + + public ComponentEvent setUpgradeVersion(String upgradeVersion) { + this.upgradeVersion = upgradeVersion; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 970788aadcf..44d781f2257 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -25,5 +25,5 @@ CONTAINER_STARTED, CONTAINER_COMPLETED, UPGRADE, - STOP_UPGRADE + CHECK_STABLE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 0e3e11bc72e..683bdff8416 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; @@ -114,6 +116,12 @@ .addTransition(READY, STARTED, BECOME_NOT_READY, new ContainerBecomeNotReadyTransition()) .addTransition(READY, INIT, STOP, new ContainerStoppedTransition()) + .addTransition(READY, UPGRADING, UPGRADE, + new ContainerUpgradeTransition()) + .addTransition(UPGRADING, UPGRADING, UPGRADE, + new ContainerUpgradeTransition()) + .addTransition(UPGRADING, READY, BECOME_READY, + new ContainerBecomeReadyTransition()) .installTopology(); @@ -184,7 +192,17 @@ public ComponentInstance(Component component, public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { compInstance.containerSpec.setState(ContainerState.READY); - compInstance.component.incContainersReady(); + if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { + compInstance.component.incContainersReady(false); + compInstance.component.decContainersThatNeedUpgrade(); + ComponentEvent checkState = new ComponentEvent( + compInstance.component.getName(), ComponentEventType.CHECK_STABLE); + compInstance.scheduler.getDispatcher().getEventHandler().handle( + checkState); + + } else { + compInstance.component.incContainersReady(true); + } if (compInstance.timelineServiceEnabled) { compInstance.serviceTimelinePublisher .componentInstanceBecomeReady(compInstance.containerSpec); @@ -197,7 +215,7 @@ public void transition(ComponentInstance compInstance, public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); - compInstance.component.decContainersReady(); + compInstance.component.decContainersReady(true); } } @@ -225,7 +243,7 @@ public void transition(ComponentInstance compInstance, compInstance.cancelContainerStatusRetriever(); if (compInstance.getState().equals(READY)) { - compInstance.component.decContainersReady(); + compInstance.component.decContainersReady(true); } compInstance.component.decRunningContainers(); boolean shouldExit = false; @@ -285,6 +303,23 @@ public void transition(ComponentInstance compInstance, } } + private static class ContainerUpgradeTransition extends BaseTransition { + + @Override + public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + compInstance.containerSpec.setState(ContainerState.UPGRADING); + compInstance.component.decContainersReady(false); + ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); + compInstance.scheduler.getContainerLaunchService() + .reInitCompInstance(compInstance.scheduler.getApp(), compInstance, + compInstance.container, + compInstance.component.createLaunchContext( + upgradeEvent.getTargetSpec(), + upgradeEvent.getUpgradeVersion())); + } + } + public ComponentInstanceState getState() { this.readLock.lock(); @@ -420,7 +455,7 @@ public void destroy() { component.decRunningContainers(); } if (getState() == READY) { - component.decContainersReady(); + component.decContainersReady(true); component.decRunningContainers(); } getCompSpec().removeContainer(containerSpec); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java index 1a880ba4426..665b8faf554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java @@ -22,5 +22,6 @@ START, STOP, BECOME_READY, - BECOME_NOT_READY + BECOME_NOT_READY, + UPGRADE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java index 243fc5269bc..f40bfb9c55d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.service.conf; +import javax.ws.rs.core.MediaType; + public interface RestApiConstants { // Rest endpoints @@ -26,9 +28,17 @@ String SERVICE_PATH = "/services/{service_name}"; String COMPONENT_PATH = "/services/{service_name}/components/{component_name}"; + String COMP_INSTANCE_PATH = SERVICE_PATH + + "/component-instances/{component_instance_name}"; + String COMP_INSTANCE_LONG_PATH = COMPONENT_PATH + + "/component-instances/{component_instance_name}"; + // Query param String SERVICE_NAME = "service_name"; String COMPONENT_NAME = "component_name"; + String COMP_INSTANCE_NAME = "component_instance_name"; + + String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8"; Long DEFAULT_UNLIMITED_LIFETIME = -1l; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java index e07661bec69..31e961ad8cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java @@ -18,11 +18,12 @@ package org.apache.hadoop.yarn.service.containerlaunch; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.provider.ProviderService; import org.apache.hadoop.yarn.service.provider.ProviderFactory; @@ -63,36 +64,57 @@ protected void serviceStop() throws Exception { } public void launchCompInstance(Service service, - ComponentInstance instance, Container container) { + ComponentInstance instance, Container container, + ComponentLaunchContext componentLaunchContext) { ContainerLauncher launcher = - new ContainerLauncher(service, instance, container); + new ContainerLauncher(service, instance, container, + componentLaunchContext, false); executorService.execute(launcher); } + public void reInitCompInstance(Service service, + ComponentInstance instance, Container container, + ComponentLaunchContext componentLaunchContext) { + ContainerLauncher reInitializer = new ContainerLauncher(service, instance, + container, componentLaunchContext, true); + executorService.execute(reInitializer); + } + private class ContainerLauncher implements Runnable { public final Container container; public final Service service; public ComponentInstance instance; + private final ComponentLaunchContext componentLaunchContext; + private final boolean reInit; - public ContainerLauncher( - Service service, - ComponentInstance instance, Container container) { + public ContainerLauncher(Service service, ComponentInstance instance, + Container container, ComponentLaunchContext componentLaunchContext, + boolean reInit) { this.container = container; this.service = service; this.instance = instance; + this.componentLaunchContext = componentLaunchContext; + this.reInit = reInit; } @Override public void run() { - Component compSpec = instance.getCompSpec(); ProviderService provider = ProviderFactory.getProviderService( - compSpec.getArtifact()); + componentLaunchContext.getArtifact()); AbstractLauncher launcher = new AbstractLauncher(context); try { provider.buildContainerLaunchContext(launcher, service, - instance, fs, getConfig(), container); - instance.getComponent().getScheduler().getNmClient() - .startContainerAsync(container, - launcher.completeContainerLaunch()); + instance, fs, getConfig(), container, componentLaunchContext); + if (!reInit) { + LOG.info("launching container {}", container.getId()); + instance.getComponent().getScheduler().getNmClient() + .startContainerAsync(container, + launcher.completeContainerLaunch()); + } else { + LOG.info("reInitializing container {}", container.getId()); + instance.getComponent().getScheduler().getNmClient() + .reInitializeContainerAsync(container.getId(), + launcher.completeContainerLaunch(), true); + } } catch (Exception e) { LOG.error(instance.getCompInstanceId() + ": Failed to launch container. ", e); @@ -100,4 +122,55 @@ public ContainerLauncher( } } } + + public static class ComponentLaunchContext { + private final String name; + private final String serviceVersion; + private Artifact artifact; + private org.apache.hadoop.yarn.service.api.records.Configuration + configuration; + private String launchCommand; + + public ComponentLaunchContext(String name, String serviceVersion) { + this.name = Preconditions.checkNotNull(name); + this.serviceVersion = Preconditions.checkNotNull(serviceVersion); + } + + public String getName() { + return name; + } + + public String getServiceVersion() { + return serviceVersion; + } + + public Artifact getArtifact() { + return artifact; + } + + public org.apache.hadoop.yarn.service.api.records.Configuration + getConfiguration() { + return configuration; + } + + public String getLaunchCommand() { + return launchCommand; + } + + public ComponentLaunchContext setArtifact(Artifact artifact) { + this.artifact = artifact; + return this; + } + + public ComponentLaunchContext setConfiguration( + org.apache.hadoop.yarn.service.api.records.Configuration configuration) { + this.configuration = configuration; + return this; + } + + public ComponentLaunchContext setLaunchCommand(String launchCommand) { + this.launchCommand = launchCommand; + return this; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index 8152225e8a1..e82181eb703 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -114,4 +116,16 @@ public RestartServiceResponseProto restart(RestartServiceRequestProto request) } return null; } + + @Override + public CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) + throws IOException, YarnException { + try { + return proxy.upgrade(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index 1a1a1ef01ee..71ad9fa82e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -21,6 +21,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -91,4 +93,15 @@ public RestartServiceResponseProto restartService(RpcController controller, throw new ServiceException(e); } } + + @Override + public CompInstancesUpgradeResponseProto upgrade(RpcController controller, + CompInstancesUpgradeRequestProto request) + throws ServiceException { + try { + return real.upgrade(request); + } catch (IOException | YarnException e ) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java index 2f840b1678c..0a3bb5836ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java @@ -23,8 +23,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; -import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -57,9 +57,9 @@ public abstract void processArtifact(AbstractLauncher launcher, public void buildContainerLaunchContext(AbstractLauncher launcher, Service service, ComponentInstance instance, - SliderFileSystem fileSystem, Configuration yarnConf, Container container) + SliderFileSystem fileSystem, Configuration yarnConf, Container container, + ContainerLaunchService.ComponentLaunchContext compLaunchContext) throws IOException, SliderException { - Component component = instance.getComponent().getComponentSpec();; processArtifact(launcher, instance, fileSystem, service); ServiceContext context = @@ -69,11 +69,12 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, Map globalTokens = instance.getComponent().getScheduler().globalTokens; Map tokensForSubstitution = ProviderUtils - .initCompTokensForSubstitute(instance, container); + .initCompTokensForSubstitute(instance, container, + compLaunchContext); tokensForSubstitution.putAll(globalTokens); // Set the environment variables in launcher - launcher.putEnv(ServiceUtils - .buildEnvMap(component.getConfiguration(), tokensForSubstitution)); + launcher.putEnv(ServiceUtils.buildEnvMap( + compLaunchContext.getConfiguration(), tokensForSubstitution)); launcher.setEnv("WORK_DIR", ApplicationConstants.Environment.PWD.$()); launcher.setEnv("LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR); if (System.getenv(HADOOP_USER_NAME) != null) { @@ -91,10 +92,10 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, // create config file on hdfs and add local resource ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem, - component, tokensForSubstitution, instance, context); + compLaunchContext, tokensForSubstitution, instance, context); // substitute launch command - String launchCommand = component.getLaunchCommand(); + String launchCommand = compLaunchContext.getLaunchCommand(); // docker container may have empty commands if (!StringUtils.isEmpty(launchCommand)) { launchCommand = ProviderUtils diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java index 11015ea1750..ce21122b68b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; @@ -36,5 +37,6 @@ void buildContainerLaunchContext(AbstractLauncher containerLauncher, Service service, ComponentInstance instance, SliderFileSystem sliderFileSystem, Configuration yarnConf, Container - container) throws IOException, SliderException; + container, ContainerLaunchService.ComponentLaunchContext componentLaunchContext) + throws IOException, SliderException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index d65a1969a13..1792c91e318 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.utils.PublishedConfiguration; @@ -160,9 +161,11 @@ public static void substituteMapWithTokens(Map configs, } public static Path initCompInstanceDir(SliderFileSystem fs, + ContainerLaunchService.ComponentLaunchContext compLaunchContext, ComponentInstance instance) { Path compDir = new Path(new Path(fs.getAppDir(), "components"), - instance.getCompName()); + compLaunchContext.getServiceVersion() + "/" + + compLaunchContext.getName()); Path compInstanceDir = new Path(compDir, instance.getCompInstanceName()); instance.setCompInstanceDir(compInstanceDir); return compInstanceDir; @@ -171,10 +174,11 @@ public static Path initCompInstanceDir(SliderFileSystem fs, // 1. Create all config files for a component on hdfs for localization // 2. Add the config file to localResource public static synchronized void createConfigFileAndAddLocalResource( - AbstractLauncher launcher, SliderFileSystem fs, Component component, + AbstractLauncher launcher, SliderFileSystem fs, + ContainerLaunchService.ComponentLaunchContext compLaunchContext, Map tokensForSubstitution, ComponentInstance instance, ServiceContext context) throws IOException { - Path compInstanceDir = initCompInstanceDir(fs, instance); + Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance); if (!fs.getFileSystem().exists(compInstanceDir)) { log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir); fs.getFileSystem().mkdirs(compInstanceDir, @@ -189,7 +193,8 @@ public static synchronized void createConfigFileAndAddLocalResource( + tokensForSubstitution); } - for (ConfigFile originalFile : component.getConfiguration().getFiles()) { + for (ConfigFile originalFile : compLaunchContext.getConfiguration() + .getFiles()) { ConfigFile configFile = originalFile.copy(); String fileName = new Path(configFile.getDestFile()).getName(); @@ -343,11 +348,12 @@ private static void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs, * @return tokens to replace */ public static Map initCompTokensForSubstitute( - ComponentInstance instance, Container container) { + ComponentInstance instance, Container container, + ContainerLaunchService.ComponentLaunchContext componentLaunchContext) { Map tokens = new HashMap<>(); - tokens.put(COMPONENT_NAME, instance.getCompSpec().getName()); + tokens.put(COMPONENT_NAME, componentLaunchContext.getName()); tokens - .put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase()); + .put(COMPONENT_NAME_LC, componentLaunchContext.getName().toLowerCase()); tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName()); tokens.put(CONTAINER_ID, container.getId().toString()); tokens.put(COMPONENT_ID, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 13d9a37ccbc..0743f43d69f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -27,6 +27,7 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; @@ -65,6 +66,8 @@ private static final PatternValidator userNamePattern = new PatternValidator("[a-z][a-z0-9-.]*"); + + @VisibleForTesting public static void setJsonSerDeser(JsonSerDeser jsd) { jsonSerDeser = jsd; @@ -455,6 +458,15 @@ public static Path writeAppDefinition(SliderFileSystem fs, Path appDir, return appJson; } + public static void overwriteUpgradeDefinition(SliderFileSystem fs, + String serviceName, String serviceVersion) throws IOException { + Path appJson = new Path(fs.buildClusterDirPath(serviceName), + serviceName + ".json"); + fs.getFileSystem().rename(new Path( + fs.buildClusterUpgradeDirPath(serviceName, serviceVersion), + serviceName + ".json"), appJson); + } + public static String $(String s) { return "${" + s +"}"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 3677593971a..c96f1de2275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -30,6 +30,8 @@ service ClientAMProtocolService { returns (UpgradeServiceResponseProto); rpc restartService(RestartServiceRequestProto) returns (RestartServiceResponseProto); + rpc upgrade(CompInstancesUpgradeRequestProto) returns + (CompInstancesUpgradeResponseProto); } message FlexComponentsRequestProto { @@ -61,6 +63,7 @@ message StopResponseProto { message UpgradeServiceRequestProto { optional string version = 1; + optional bool autoFinalize = 2; } message UpgradeServiceResponseProto { @@ -70,4 +73,11 @@ message RestartServiceRequestProto { } message RestartServiceResponseProto { +} + +message CompInstancesUpgradeRequestProto { + repeated string containerIds = 1; +} + +message CompInstancesUpgradeResponseProto { } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index 8db98bd0275..bcc657c433e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -98,6 +98,7 @@ public void testContainerCompleted() throws TimeoutException, ApplicationId applicationId = ApplicationId.newInstance(123456, 1); Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); + exampleApp.setVersion("v1"); exampleApp.setName("testContainerCompleted"); exampleApp.addComponent(createComponent("compa", 1, "pwd")); @@ -136,6 +137,7 @@ public void testContainersFromPreviousAttemptsWithRMRestart() System.currentTimeMillis(), 1); Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); + exampleApp.setVersion("v1"); exampleApp.setName("testContainersRecovers"); String comp1Name = "comp1"; String comp1InstName = "comp1-0"; @@ -179,6 +181,7 @@ public void testContainersReleasedWhenExpired() Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); exampleApp.setName("testContainersRecovers"); + exampleApp.setVersion("v1"); String comp1Name = "comp1"; String comp1InstName = "comp1-0"; @@ -220,6 +223,7 @@ public void testContainersFromDifferentApp() Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); exampleApp.setName("testContainersFromDifferentApp"); + exampleApp.setVersion("v1"); String comp1Name = "comp1"; String comp1InstName = "comp1-0"; @@ -260,6 +264,7 @@ public void testScheduleWithMultipleResourceTypes() Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); exampleApp.setName("testScheduleWithMultipleResourceTypes"); + exampleApp.setVersion("v1"); List resourceTypeInfos = new ArrayList<>( ResourceUtils.getResourcesTypeInfo()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index c65a5d4efe6..025a1c82b79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -49,7 +49,7 @@ @Test public void testUpgrade() throws IOException, SliderException { ServiceManager serviceManager = createTestServiceManager("testUpgrade"); - upgrade(serviceManager, "v2", false); + upgrade(serviceManager, "v2", false, false); Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); } @@ -58,7 +58,7 @@ public void testUpgrade() throws IOException, SliderException { public void testRestartNothingToUpgrade() throws IOException, SliderException { ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", false); + upgrade(serviceManager, "v2", false, false); //make components stable serviceManager.getServiceSpec().getComponents().forEach(comp -> { @@ -73,15 +73,68 @@ public void testRestartNothingToUpgrade() public void testRestartWithPendingUpgrade() throws IOException, SliderException { ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", true); + upgrade(serviceManager, "v2", true, false); serviceManager.handle(new ServiceEvent(ServiceEventType.START)); Assert.assertEquals("service should still be upgrading", ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); } + @Test + public void testCheckState() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager( + "testCheckState"); + upgrade(serviceManager, "v2", true, false); + Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, + serviceManager.getServiceSpec().getState()); + + // make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> { + comp.setState(ComponentState.STABLE); + }); + ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); + serviceManager.handle(checkStable); + Assert.assertEquals("service should still be upgrading", + ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + + // finalize service + ServiceEvent restart = new ServiceEvent(ServiceEventType.START); + serviceManager.handle(restart); + Assert.assertEquals("service not stable", + ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + + Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), + serviceManager.getName()); + Assert.assertEquals("service def not overwritten", "v2", + savedSpec.getVersion()); + } + + @Test + public void testCheckStateAutoFinalize() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager( + "testCheckState"); + serviceManager.getServiceSpec().setState( + ServiceState.UPGRADING_AUTO_FINALIZE); + upgrade(serviceManager, "v2", true, true); + Assert.assertEquals("service not upgrading", + ServiceState.UPGRADING_AUTO_FINALIZE, + serviceManager.getServiceSpec().getState()); + + // make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> + comp.setState(ComponentState.STABLE)); + ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); + serviceManager.handle(checkStable); + Assert.assertEquals("service not stable", + ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + + Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), + serviceManager.getName()); + Assert.assertEquals("service def not overwritten", "v2", + savedSpec.getVersion()); + } private void upgrade(ServiceManager service, String version, - boolean upgradeArtifact) + boolean upgradeArtifact, boolean autoFinalize) throws IOException, SliderException { Service upgradedDef = ServiceTestUtils.createExampleApplication(); upgradedDef.setName(service.getName()); @@ -95,6 +148,9 @@ private void upgrade(ServiceManager service, String version, writeUpgradedDef(upgradedDef); ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); upgradeEvent.setVersion("v2"); + if (autoFinalize) { + upgradeEvent.setAutoFinalize(true); + } service.handle(upgradeEvent); } @@ -124,7 +180,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( return new ServiceManager(context); } - static Service createBaseDef(String name) { + public static Service createBaseDef(String name) { ApplicationId applicationId = ApplicationId.newInstance( System.currentTimeMillis(), 1); Service serviceDef = ServiceTestUtils.createExampleApplication(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index df6d64224ae..ce2b932216b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.Component; @@ -334,25 +335,45 @@ public void testRecoverComponentsAfterRMRestart() throws Exception { } @Test(timeout = 200000) - public void testUpgradeService() throws Exception { + public void testUpgrade() throws Exception { setupInternal(NUM_NMS); ServiceClient client = createClient(getConf()); Service service = createExampleApplication(); client.actionCreate(service); - waitForServiceToBeStarted(client, service); + waitForServiceToBeStable(client, service); - //upgrade the service + // upgrade the service + Component component = service.getComponents().iterator().next(); + service.setState(ServiceState.UPGRADING); service.setVersion("v2"); + component.getConfiguration().getEnv().put("env1", "val1"); client.actionUpgrade(service); - //wait for service to be in upgrade state + // wait for service to be in upgrade state waitForServiceToBeInState(client, service, ServiceState.UPGRADING); SliderFileSystem fs = new SliderFileSystem(getConf()); Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs, service.getName(), service.getVersion()); Assert.assertEquals(service.getName(), fromFs.getName()); Assert.assertEquals(service.getVersion(), fromFs.getVersion()); + + // upgrade containers + Service liveService = client.getStatus(service.getName()); + client.actionUpgrade(service, + liveService.getComponent(component.getName()).getContainers()); + waitForAllCompToBeReady(client, service); + + // finalize the upgrade + client.actionStart(service.getName()); + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); } // Check containers launched are in dependency order diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java index cc5b6ec7fe4..9f17541a831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java @@ -24,17 +24,27 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.ClientAMProtocol; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.mockito.Matchers; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -47,79 +57,144 @@ */ public class TestServiceClient { + private static final Logger LOG = LoggerFactory.getLogger( + TestServiceClient.class); + @Rule public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); @Test - public void testActionUpgrade() throws Exception { - ApplicationId applicationId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - ServiceClient client = createServiceClient(applicationId); - - Service service = ServiceTestUtils.createExampleApplication(); - service.setVersion("v1"); - client.actionCreate(service); + public void testActionServiceUpgrade() throws Exception { + Service service = createService(); + ServiceClient client = MockServiceClient.create(rule, service); //upgrade the service service.setVersion("v2"); client.actionUpgrade(service); - //wait for service to be in upgrade state Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(), service.getName(), service.getVersion()); Assert.assertEquals(service.getName(), fromFs.getName()); Assert.assertEquals(service.getVersion(), fromFs.getVersion()); + client.stop(); } + @Test + public void testActionCompInstanceUpgrade() throws Exception { + Service service = createService(); + MockServiceClient client = MockServiceClient.create(rule, service); - private ServiceClient createServiceClient(ApplicationId applicationId) - throws Exception { - ClientAMProtocol amProxy = mock(ClientAMProtocol.class); - YarnClient yarnClient = createMockYarnClient(); - ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( - applicationId, 1); - ApplicationAttemptReport attemptReport = - ApplicationAttemptReport.newInstance(attemptId, "localhost", 0, - null, null, null, - YarnApplicationAttemptState.RUNNING, null); - - ApplicationReport appReport = mock(ApplicationReport.class); - when(appReport.getHost()).thenReturn("localhost"); + //upgrade the service + service.setVersion("v2"); + client.actionUpgrade(service); - when(yarnClient.getApplicationAttemptReport(Matchers.any())) - .thenReturn(attemptReport); - when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport); + //add containers to the component that needs to be upgraded. + Component comp = service.getComponents().iterator().next(); + ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L); + comp.addContainer(new Container().id(containerId.toString())); - ServiceClient client = new ServiceClient() { - @Override - protected void serviceInit(Configuration configuration) throws Exception { - } + client.actionUpgrade(service, comp.getContainers()); + CompInstancesUpgradeResponseProto response = client.getLastProxyResponse( + CompInstancesUpgradeResponseProto.class); + Assert.assertNotNull("upgrade did not complete", response); + client.stop(); + } - @Override - protected ClientAMProtocol createAMProxy(String serviceName, - ApplicationReport appReport) throws IOException, YarnException { - return amProxy; - } + private Service createService() throws IOException, + YarnException { + Service service = ServiceTestUtils.createExampleApplication(); + service.setVersion("v1"); + service.setState(ServiceState.UPGRADING); + return service; + } - @Override - ApplicationId submitApp(Service app) throws IOException, YarnException { - return applicationId; + private static class MockServiceClient extends ServiceClient { + + private final ApplicationId appId; + private final ApplicationAttemptId attemptId; + private final ClientAMProtocol amProxy; + private Object proxyResponse; + private Service service; + + private MockServiceClient() { + amProxy = mock(ClientAMProtocol.class); + appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + LOG.debug("mocking service client for {}", appId); + attemptId = ApplicationAttemptId.newInstance(appId, 1); + } + + static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule, + Service service) + throws IOException, YarnException { + MockServiceClient client = new MockServiceClient(); + + YarnClient yarnClient = createMockYarnClient(); + ApplicationReport appReport = mock(ApplicationReport.class); + when(appReport.getHost()).thenReturn("localhost"); + when(appReport.getYarnApplicationState()).thenReturn( + YarnApplicationState.RUNNING); + + ApplicationAttemptReport attemptReport = + ApplicationAttemptReport.newInstance(client.attemptId, "localhost", 0, + null, null, null, + YarnApplicationAttemptState.RUNNING, null); + when(yarnClient.getApplicationAttemptReport(Matchers.any())) + .thenReturn(attemptReport); + when(yarnClient.getApplicationReport(client.appId)).thenReturn(appReport); + + when(client.amProxy.upgrade(Matchers.any( + CompInstancesUpgradeRequestProto.class))).thenAnswer( + (Answer) invocation -> { + CompInstancesUpgradeResponseProto response = + CompInstancesUpgradeResponseProto.newBuilder().build(); + client.proxyResponse = response; + return response; + }); + client.setFileSystem(rule.getFs()); + client.setYarnClient(yarnClient); + client.service = service; + + client.init(rule.getConf()); + client.start(); + client.actionCreate(service); + return client; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + } + + @Override + protected ClientAMProtocol createAMProxy(String serviceName, + ApplicationReport appReport) throws IOException, YarnException { + return amProxy; + } + + @Override + ApplicationId submitApp(Service app) throws IOException, YarnException { + return appId; + } + + @Override + public Service getStatus(String serviceName) throws IOException, + YarnException { + return service; + } + + private T getLastProxyResponse(Class clazz) { + if (clazz.isInstance(proxyResponse)) { + return clazz.cast(proxyResponse); } - }; - - client.setFileSystem(rule.getFs()); - client.setYarnClient(yarnClient); - - client.init(rule.getConf()); - client.start(); - return client; + return null; + } } - private YarnClient createMockYarnClient() throws IOException, YarnException { + private static YarnClient createMockYarnClient() throws IOException, + YarnException { YarnClient yarnClient = mock(YarnClient.class); - when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class))) - .thenReturn(new ArrayList<>()); + when(yarnClient.getApplications(Matchers.any( + GetApplicationsRequest.class))).thenReturn(new ArrayList<>()); return yarnClient; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java new file mode 100644 index 00000000000..88d2b8c2056 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.TestServiceManager; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class TestComponent { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testComponentUpgrade() throws Exception { + ServiceContext context = createTestContext("testComponentUpgrade"); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + //bring component to stable state + makeCompStable(comp, 2); + + ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(), + ComponentEventType.UPGRADE); + comp.handle(upgradeEvent); + Assert.assertEquals("component not in need upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + } + + @Test + public void testCheckState() throws Exception { + ServiceContext context = createTestContext("testCheckState"); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + // bring component to stable state + makeCompStable(comp, 2); + org.apache.hadoop.yarn.service.api.records.Component spec = + context.service.getComponent(comp.getName()); + spec.getConfiguration().getEnv().put("env1", "val1"); + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(spec)); + + // one instance finished upgrading + comp.decContainersThatNeedUpgrade(); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + Assert.assertEquals("component not in need upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + + // second instance finished upgrading + comp.decContainersThatNeedUpgrade(); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + } + + private ServiceContext createTestContext(String serviceName) + throws Exception { + ServiceContext context = new ServiceContext(); + context.service = TestServiceManager.createBaseDef(serviceName); + context.fs = rule.getFs(); + + context.scheduler = new ServiceScheduler(context) { + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + + }; + context.scheduler.init(rule.getConf()); + return context; + } + + private static void makeCompStable(Component component, int numContainers) { + //bring component to stable state + for(int i = 0; i < numContainers; i++) { + component.incContainersReady(true); + } + ComponentEvent flexEvent = new ComponentEvent(component.getName(), + ComponentEventType.FLEX); + component.handle(flexEvent); + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java new file mode 100644 index 00000000000..e8761f6406b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.TestServiceManager; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.TestComponent; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link ComponentInstance} + */ +public class TestComponentInstance { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testContainerUpgrade() throws Exception { + ServiceContext context = createTestContext("testContainerUpgrade"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances() + .iterator().next(); + ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(instanceEvent); + Assert.assertEquals("instance not upgrading", + ContainerState.UPGRADING, instance.getCompSpec().getContainer( + instance.getContainer().getId().toString()).getState()); + } + + @Test + public void testContainerReadyAfterUpgrade() throws Exception { + ServiceContext context = createTestContext("testContainerStarted"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances() + .iterator().next(); + + ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(instanceEvent); + + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + + Assert.assertEquals("instance not ready", + ContainerState.READY, instance.getCompSpec().getContainer( + instance.getContainer().getId().toString()).getState()); + } + + private void upgradeComponent(Component component) { + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.UPGRADE) + .setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2")); + } + + private ServiceContext createTestContext(String serviceName) + throws Exception { + ServiceContext context = new ServiceContext(); + context.service = TestServiceManager.createBaseDef(serviceName); + context.fs = rule.getFs(); + + ContainerLaunchService mockLaunchService = mock( + ContainerLaunchService.class); + + context.scheduler = new ServiceScheduler(context) { + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + + @Override + public NMClientAsync createNMClient() { + NMClientAsync nmClientAsync = super.createNMClient(); + NMClient nmClient = mock(NMClient.class); + try { + when(nmClient.getContainerStatus(anyObject(), anyObject())) + .thenAnswer((Answer) invocation -> + ContainerStatus.newInstance( + (ContainerId) invocation.getArguments()[0], + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + "", 0)); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } + nmClientAsync.setClient(nmClient); + return nmClientAsync; + } + + @Override + public ContainerLaunchService getContainerLaunchService() { + return mockLaunchService; + } + }; + context.scheduler.init(rule.getConf()); + + doNothing().when(mockLaunchService). + reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); + + + NodeId nodeId = NodeId.fromString("localhost:0"); + ApplicationId appId = ApplicationId.fromString(context.service.getId()); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,1); + + Map + componentState = context.scheduler.getAllComponents(); + + context.service.getComponents().forEach(componentSpec -> { + Component component = new org.apache.hadoop.yarn.service.component. + Component(componentSpec, 1L, context); + componentState.put(component.getName(), component); + long compNum = 1; + for (ComponentInstance instance : component.getAllComponentInstances()) { + Container container = org.apache.hadoop.yarn.api.records.Container + .newInstance(ContainerId.newContainerId(attemptId, compNum++), + nodeId, "localhost", + null, null, null); + instance.setContainer(container); + ComponentInstanceEvent startEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.START); + instance.handle(startEvent); + + ComponentInstanceEvent readyEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.BECOME_READY); + instance.handle(readyEvent); + context.scheduler.addLiveCompInstance(container.getId(), instance); + } + // need to bring it to stable state by triggering flex + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.FLEX)); + }); + + return context; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java index e25d38dd491..7cef91e25db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java @@ -81,6 +81,7 @@ public void tearDown() throws IOException { public void testComponentDependency() throws Exception{ ApplicationId applicationId = ApplicationId.newInstance(123456, 1); Service exampleApp = new Service(); + exampleApp.setVersion("v1"); exampleApp.setId(applicationId.toString()); exampleApp.setName("testComponentDependency"); exampleApp.addComponent(createComponent("compa", 1, "sleep 1000")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index d5eb787c706..5763890b559 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -245,4 +246,16 @@ public abstract String getStatusString(String appIdOrName) throws public abstract int actionUpgrade(String appName, String fileName) throws IOException, YarnException; + /** + * Upgrade containers of a long running service. + * + * @param appName the name of the application + * @param compName the name of the component + * @param compInstanceName name of the instance to upgrade + */ + @Public + @Unstable + public abstract int actionUpgrade(String appName, String compName, + String compInstanceName) throws IOException, YarnException; + }