diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 92294468105..6e5ff1538cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -600,6 +600,26 @@ public String getStatusString(String appIdOrName) throws IOException, return output; } + @Override + public int actionUpgradeExpress(String appName, File path) + throws IOException, YarnException { + int result = 0; + try { + Service service = + loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null); + service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + String buffer = jsonSerDeser.toJson(service); + LOG.info("Upgrade in progress. Please wait.."); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade application: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + @Override public int initiateUpgrade(String appName, String fileName, boolean autoFinalize) throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 4db0ac8f409..1fda46a2190 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.conf.RestApiConstants; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.eclipse.jetty.server.ConnectionFactory.Upgrading; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -690,7 +691,11 @@ private Response upgradeService(Service service, ServiceClient sc = getServiceClient(); sc.init(YARN_CONFIG); sc.start(); - sc.initiateUpgrade(service); + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + sc.actionUpgradeExpress(service); + } else { + sc.initiateUpgrade(service); + } sc.close(); return null; }); @@ -706,7 +711,8 @@ private Response processComponentsUpgrade(UserGroupInformation ugi, String serviceName, Set compNames) throws YarnException, IOException, InterruptedException { Service service = getServiceFromClient(ugi, serviceName); - if (service.getState() != ServiceState.UPGRADING) { + if (!service.getState().equals(ServiceState.UPGRADING) && + !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { throw new YarnException( String.format("The upgrade of service %s has not been initiated.", service.getName())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 5bf183319fd..2ef8f7ee7b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -166,7 +166,7 @@ public UpgradeServiceResponseProto upgrade( LOG.info("Upgrading service to version {} by {}", request.getVersion(), UserGroupInformation.getCurrentUser()); context.getServiceManager().processUpgradeRequest(request.getVersion(), - request.getAutoFinalize()); + request.getAutoFinalize(), request.getExpressUpgrade()); return UpgradeServiceResponseProto.newBuilder().build(); } catch (Exception ex) { return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 0196be2a989..0cc6c3ff99c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -29,6 +29,7 @@ private final ServiceEventType type; private String version; private boolean autoFinalize; + private boolean expressUpgrade; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -56,4 +57,13 @@ public ServiceEvent setAutoFinalize(boolean autoFinalize) { this.autoFinalize = autoFinalize; return this; } + + public boolean isExpressUpgrade() { + return expressUpgrade; + } + + public ServiceEvent setExpressUpgrade(boolean expressUpgrade) { + this.expressUpgrade = expressUpgrade; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index 05ecb3fc9be..2c09069eb3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -142,11 +142,14 @@ private State getState() { public State transition(ServiceManager serviceManager, ServiceEvent event) { try { - if (!event.isAutoFinalize()) { - serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + if (event.isExpressUpgrade()) { + serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING); + } else if (event.isAutoFinalize()) { + serviceManager.serviceSpec.setState(ServiceState + .UPGRADING_AUTO_FINALIZE); } else { serviceManager.serviceSpec.setState( - ServiceState.UPGRADING_AUTO_FINALIZE); + ServiceState.UPGRADING); } serviceManager.upgradeVersion = event.getVersion(); return State.UPGRADING; @@ -250,7 +253,7 @@ public void checkAndUpdateServiceState() { } void processUpgradeRequest(String upgradeVersion, - boolean autoFinalize) throws IOException { + boolean autoFinalize, boolean expressUpgrade) throws IOException { Service targetSpec = ServiceApiUtil.loadServiceUpgrade( context.fs, context.service.getName(), upgradeVersion); @@ -277,22 +280,21 @@ void processUpgradeRequest(String upgradeVersion, if (restartPolicyHandler.allowUpgrades()) { ComponentEvent needUpgradeEvent = new ComponentEvent( component.getName(), ComponentEventType.UPGRADE).setTargetSpec( - component).setUpgradeVersion(event.getVersion()); + component).setUpgradeVersion(event.getVersion()) + .setExpressUpgrade(expressUpgrade); context.scheduler.getDispatcher().getEventHandler().handle( needUpgradeEvent); } else { - LOG.info("The component {} has a restart " - + "policy that doesnt allow upgrades {} ", component.getName(), + LOG.info("The component {} has a restart policy that doesnt allow " + + "upgrades {} ", component.getName(), component.getRestartPolicy().toString()); } }); - } else { + } else if (autoFinalize) { // nothing to upgrade if upgrade auto finalize is requested, trigger a // state check. - if (autoFinalize) { - context.scheduler.getDispatcher().getEventHandler().handle( - new ServiceEvent(ServiceEventType.CHECK_STABLE)); - } + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CHECK_STABLE)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 0801ad052db..2a231ead3b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -219,7 +219,7 @@ public void buildInstance(ServiceContext context, Configuration configuration) nmClient.getClient().cleanupRunningContainersOnStop(false); addIfService(nmClient); - dispatcher = new AsyncDispatcher("Component dispatcher"); + dispatcher = createAsyncDispatcher(); dispatcher.register(ServiceEventType.class, new ServiceEventHandler()); dispatcher.register(ComponentEventType.class, new ComponentEventHandler()); @@ -253,6 +253,9 @@ public void buildInstance(ServiceContext context, Configuration configuration) YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, app.getConfiguration(), getConfig()); + + serviceManager = createServiceManager(); + context.setServiceManager(serviceManager); } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -262,6 +265,15 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( context.attemptId); } + protected ServiceManager createServiceManager() + { + return new ServiceManager(context); + } + + protected AsyncDispatcher createAsyncDispatcher() { + return new AsyncDispatcher("Component dispatcher"); + } + protected NMClientAsync createNMClient() { return NMClientAsync.createNMClientAsync(new NMClientCallback()); } @@ -344,8 +356,6 @@ public void serviceStart() throws Exception { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); - serviceManager = new ServiceManager(context); - context.setServiceManager(serviceManager); // recover components based on containers sent from RM recoverComponents(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index b6ae38bdeee..0b3c0377fab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -30,5 +30,5 @@ @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, - UPGRADING_AUTO_FINALIZE; + UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 5668d9fa3ae..a27ed87aa63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.client; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -215,48 +216,31 @@ public int actionBuild(Service service) return EXIT_SUCCESS; } - @Override - public int initiateUpgrade(String appName, String fileName, - boolean autoFinalize) - throws IOException, YarnException { - Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, - null, null); - if (autoFinalize) { - upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE); - } else { - upgradeService.setState(ServiceState.UPGRADING); - } - return initiateUpgrade(upgradeService); - } - - public int initiateUpgrade(Service service) throws YarnException, - IOException { + private ApplicationReport upgradePrecheck(Service service) + throws YarnException, IOException { boolean upgradeEnabled = getConfig().getBoolean( - YARN_SERVICE_UPGRADE_ENABLED, - YARN_SERVICE_UPGRADE_ENABLED_DEFAULT); + YARN_SERVICE_UPGRADE_ENABLED, YARN_SERVICE_UPGRADE_ENABLED_DEFAULT); if (!upgradeEnabled) { throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED); } - Service persistedService = - ServiceApiUtil.loadService(fs, service.getName()); + Service persistedService = ServiceApiUtil.loadService(fs, + service.getName()); if (!StringUtils.isEmpty(persistedService.getId())) { - cachedAppInfo.put(persistedService.getName(), new AppInfo( - ApplicationId.fromString(persistedService.getId()), - persistedService.getKerberosPrincipal().getPrincipalName())); + cachedAppInfo.put(persistedService.getName(), + new AppInfo(ApplicationId.fromString(persistedService.getId()), + persistedService.getKerberosPrincipal().getPrincipalName())); } if (persistedService.getVersion().equals(service.getVersion())) { - String message = - service.getName() + " is already at version " + service.getVersion() - + ". There is nothing to upgrade."; + String message = service.getName() + " is already at version " + + service.getVersion() + ". There is nothing to upgrade."; LOG.error(message); throw new YarnException(message); } Service liveService = getStatus(service.getName()); if (!liveService.getState().equals(ServiceState.STABLE)) { - String message = service.getName() + " is at " + - liveService.getState() + String message = service.getName() + " is at " + liveService.getState() + " state and upgrade can only be initiated when service is STABLE."; LOG.error(message); throw new YarnException(message); @@ -266,11 +250,67 @@ public int initiateUpgrade(Service service) throws YarnException, ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); - ApplicationReport appReport = - yarnClient.getApplicationReport(getAppId(service.getName())); + ApplicationReport appReport = yarnClient + .getApplicationReport(getAppId(service.getName())); if (StringUtils.isEmpty(appReport.getHost())) { throw new YarnException(service.getName() + " AM hostname is empty"); } + return appReport; + } + + @Override + public int actionUpgradeExpress(String appName, File path) + throws IOException, YarnException { + Service service = + loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null); + service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + actionUpgradeExpress(service); + return EXIT_SUCCESS; + } + + public int actionUpgradeExpress(Service service) throws YarnException, + IOException { + ApplicationReport appReport = upgradePrecheck(service); + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + UpgradeServiceRequestProto.Builder requestBuilder = + UpgradeServiceRequestProto.newBuilder(); + requestBuilder.setVersion(service.getVersion()); + if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + requestBuilder.setAutoFinalize(true); + } + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + requestBuilder.setExpressUpgrade(true); + requestBuilder.setAutoFinalize(true); + } + UpgradeServiceResponseProto responseProto = proxy.upgrade( + requestBuilder.build()); + if (responseProto.hasError()) { + LOG.error("Service {} express upgrade to version {} failed because {}", + service.getName(), service.getVersion(), responseProto.getError()); + throw new YarnException("Failed to express upgrade service " + + service.getName() + " to version " + service.getVersion() + + " because " + responseProto.getError()); + } + return EXIT_SUCCESS; + } + + @Override + public int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) + throws IOException, YarnException { + Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, + null, null); + if (autoFinalize) { + upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + upgradeService.setState(ServiceState.UPGRADING); + } + return initiateUpgrade(upgradeService); + } + + public int initiateUpgrade(Service service) throws YarnException, + IOException { + ApplicationReport appReport = upgradePrecheck(service); ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); UpgradeServiceRequestProto.Builder requestBuilder = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 41a2fcd104b..acf3404fe93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import static org.apache.hadoop.yarn.service.api.records.Component @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; @@ -546,13 +548,21 @@ public void transition(Component component, ComponentEvent event) { @Override public void transition(Component component, ComponentEvent event) { component.upgradeInProgress.set(true); + component.upgradeEvent = event; component.componentSpec.setState(org.apache.hadoop.yarn.service.api. records.ComponentState.NEEDS_UPGRADE); component.numContainersThatNeedUpgrade.set( component.componentSpec.getNumberOfContainers()); - component.componentSpec.getContainers().forEach(container -> - container.setState(ContainerState.NEEDS_UPGRADE)); - component.upgradeEvent = event; + component.componentSpec.getContainers().forEach(container -> { + container.setState(ContainerState.NEEDS_UPGRADE); + if (event.isExpressUpgrade()) { + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + ContainerId.fromString(container.getId()), + ComponentInstanceEventType.UPGRADE); + LOG.info("Upgrade container {}", container.getId()); + component.dispatcher.getEventHandler().handle(upgradeEvent); + } + }); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 84caa77b205..643961d505a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -35,6 +35,7 @@ private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; private String upgradeVersion; + private boolean expressUpgrade; public ContainerId getContainerId() { return containerId; @@ -113,4 +114,13 @@ public ComponentEvent setUpgradeVersion(String upgradeVersion) { this.upgradeVersion = upgradeVersion; return this; } + + public boolean isExpressUpgrade() { + return expressUpgrade; + } + + public ComponentEvent setExpressUpgrade(boolean expressUpgrade) { + this.expressUpgrade = expressUpgrade; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 11a6caa901d..ed5e68e98f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -380,6 +380,11 @@ public void transition(ComponentInstance compInstance, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { + if (!compInstance.containerSpec.getState().equals( + ContainerState.NEEDS_UPGRADE)) { + //nothing to upgrade. this may happen with express upgrade. + return; + } compInstance.containerSpec.setState(ContainerState.UPGRADING); compInstance.component.decContainersReady(false); ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 92195693274..b588e88ae7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -638,6 +638,32 @@ public static void validateInstancesUpgrade(List return containerNeedUpgrade; } + /** + * Validates the components that are requested are stable for upgrade. + * It returns the instances of the components which are in ready state. + */ + public static List validateAndResolveCompsStable( + Service liveService, Collection compNames) throws YarnException { + Preconditions.checkNotNull(compNames); + HashSet requestedComps = Sets.newHashSet(compNames); + List containerNeedUpgrade = new ArrayList<>(); + for (Component liveComp : liveService.getComponents()) { + if (requestedComps.contains(liveComp.getName())) { + if (!liveComp.getState().equals(ComponentState.STABLE)) { + // Nothing to upgrade + throw new YarnException(String.format( + ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName())); + } + liveComp.getContainers().forEach(liveContainer -> { + if (liveContainer.getState().equals(ContainerState.READY)) { + containerNeedUpgrade.add(liveContainer); + } + }); + } + } + return containerNeedUpgrade; + } + private static String parseComponentName(String componentInstanceName) throws YarnException { int idx = componentInstanceName.lastIndexOf('-'); @@ -651,4 +677,22 @@ private static String parseComponentName(String componentInstanceName) public static String $(String s) { return "${" + s +"}"; } + + public static List resolveCompsDependency(Service service) { + List components = new ArrayList(); + for (Component component : service.getComponents()) { + int depSize = component.getDependencies().size(); + if (!components.contains(component.getName())) { + components.add(component.getName()); + } + if (depSize != 0) { + for (String depComp : component.getDependencies()) { + if (!components.contains(depComp)) { + components.add(0, depComp); + } + } + } + } + return components; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 6166dedd1de..169f765b8a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -66,6 +66,7 @@ message StopResponseProto { message UpgradeServiceRequestProto { optional string version = 1; optional bool autoFinalize = 2; + optional bool expressUpgrade = 3; } message UpgradeServiceResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index fc509f19420..453a27895a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -19,23 +19,24 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.exceptions.SliderException; -import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; -import java.util.Map; - -import static org.mockito.Mockito.mock; +import java.util.concurrent.TimeoutException; /** * Tests for {@link ServiceManager}. @@ -46,117 +47,120 @@ public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); - @Test - public void testUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testUpgrade"); - upgrade(serviceManager, "v2", false, false); + @Test (timeout = TIMEOUT) + public void testUpgrade() throws Exception { + ServiceContext context = createServiceContext("testUpgrade"); + initUpgrade(context, "v2", false, false, false); Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, - serviceManager.getServiceSpec().getState()); + context.getServiceManager().getServiceSpec().getState()); } - @Test + @Test (timeout = TIMEOUT) public void testRestartNothingToUpgrade() - throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( + throws Exception { + ServiceContext context = createServiceContext( "testRestartNothingToUpgrade"); - upgrade(serviceManager, "v2", false, false); - - //make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> { - comp.setState(ComponentState.STABLE); - }); - serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + initUpgrade(context, "v2", false, false, false); + ServiceManager manager = context.getServiceManager(); + //make components stable by upgrading all instances + upgradeInstances(context); + + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service not re-started", ServiceState.STABLE, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); } - @Test - public void testAutoFinalizeNothingToUpgrade() throws IOException, - SliderException { - ServiceManager serviceManager = createTestServiceManager( + @Test(timeout = TIMEOUT) + public void testAutoFinalizeNothingToUpgrade() throws Exception { + ServiceContext context = createServiceContext( "testAutoFinalizeNothingToUpgrade"); - upgrade(serviceManager, "v2", false, true); - - //make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> - comp.setState(ComponentState.STABLE)); - serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE)); + initUpgrade(context, "v2", false, true, false); + ServiceManager manager = context.getServiceManager(); + //make components stable by upgrading all instances + upgradeInstances(context); + + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service stable", ServiceState.STABLE, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); } - @Test + @Test(timeout = TIMEOUT) public void testRestartWithPendingUpgrade() - throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", true, false); - serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + throws Exception { + ServiceContext context = createServiceContext("testRestart"); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + context.scheduler.getDispatcher().stop(); Assert.assertEquals("service should still be upgrading", - ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + ServiceState.UPGRADING, manager.getServiceSpec().getState()); } - @Test - public void testCheckState() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testCheckState"); - upgrade(serviceManager, "v2", true, false); + @Test(timeout = TIMEOUT) + public void testFinalize() throws Exception { + ServiceContext context = createServiceContext("testCheckState"); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); - // make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> { - comp.setState(ComponentState.STABLE); - }); - ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); - serviceManager.handle(checkStable); - Assert.assertEquals("service should still be upgrading", - ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + //make components stable by upgrading all instances + upgradeInstances(context); // finalize service - ServiceEvent restart = new ServiceEvent(ServiceEventType.START); - serviceManager.handle(restart); - Assert.assertEquals("service not stable", - ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service not re-started", ServiceState.STABLE, + manager.getServiceSpec().getState()); - validateUpgradeFinalization(serviceManager.getName(), "v2"); + validateUpgradeFinalization(manager.getName(), "v2"); } - @Test - public void testCheckStateAutoFinalize() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testCheckState"); - serviceManager.getServiceSpec().setState( + @Test(timeout = TIMEOUT) + public void testAutoFinalize() throws Exception { + ServiceContext context = createServiceContext("testCheckStateAutoFinalize"); + ServiceManager manager = context.getServiceManager(); + manager.getServiceSpec().setState( ServiceState.UPGRADING_AUTO_FINALIZE); - upgrade(serviceManager, "v2", true, true); - Assert.assertEquals("service not upgrading", - ServiceState.UPGRADING_AUTO_FINALIZE, - serviceManager.getServiceSpec().getState()); + initUpgrade(context, "v2", true, true, false); // make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> - comp.setState(ComponentState.STABLE)); - ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); - serviceManager.handle(checkStable); + upgradeInstances(context); + + GenericTestUtils.waitFor(() -> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service not stable", - ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + ServiceState.STABLE, manager.getServiceSpec().getState()); - validateUpgradeFinalization(serviceManager.getName(), "v2"); + validateUpgradeFinalization(manager.getName(), "v2"); } @Test - public void testInvalidUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testInvalidUpgrade"); - serviceManager.getServiceSpec().setState( + public void testInvalidUpgrade() throws Exception { + ServiceContext serviceContext = createServiceContext("testInvalidUpgrade"); + ServiceManager manager = serviceContext.getServiceManager(); + manager.getServiceSpec().setState( ServiceState.UPGRADING_AUTO_FINALIZE); Service upgradedDef = ServiceTestUtils.createExampleApplication(); - upgradedDef.setName(serviceManager.getName()); + upgradedDef.setName(manager.getName()); upgradedDef.setVersion("v2"); upgradedDef.setLifetime(2L); writeUpgradedDef(upgradedDef); try { - serviceManager.processUpgradeRequest("v2", true); + manager.processUpgradeRequest("v2", true, false); } catch (Exception ex) { Assert.assertTrue(ex instanceof UnsupportedOperationException); return; @@ -164,6 +168,44 @@ public void testInvalidUpgrade() throws IOException, SliderException { Assert.fail(); } + @Test(timeout = TIMEOUT) + public void testExpressUpgrade() throws Exception { + ServiceContext context = createServiceContext("testExpressUpgrade"); + ServiceManager manager = context.getServiceManager(); + manager.getServiceSpec().setState( + ServiceState.EXPRESS_UPGRADING); + initUpgrade(context, "v2", true, true, true); + + // wait till instances are in upgrade + GenericTestUtils.waitFor(() -> { + + for (ComponentInstance instance : + context.scheduler.getLiveInstances().values()) { + if (!instance.getContainerState().equals(ContainerState.UPGRADING)) { + return false; + } + } + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); + + // instances get upgraded and become ready event is triggered + // become ready + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.BECOME_READY); + + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); + + GenericTestUtils.waitFor(() -> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + + Assert.assertEquals("service not stable", + ServiceState.STABLE, manager.getServiceSpec().getState()); + validateUpgradeFinalization(manager.getName(), "v2"); + } + private void validateUpgradeFinalization(String serviceName, String expectedVersion) throws IOException { Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName); @@ -172,15 +214,16 @@ private void validateUpgradeFinalization(String serviceName, Assert.assertNotNull("app id not present", savedSpec.getId()); Assert.assertEquals("state not stable", ServiceState.STABLE, savedSpec.getState()); - savedSpec.getComponents().forEach(compSpec -> { - Assert.assertEquals("comp not stable", ComponentState.STABLE, - compSpec.getState()); - }); + savedSpec.getComponents().forEach(compSpec -> + Assert.assertEquals("comp not stable", ComponentState.STABLE, + compSpec.getState())); } - private void upgrade(ServiceManager serviceManager, String version, - boolean upgradeArtifact, boolean autoFinalize) - throws IOException, SliderException { + private void initUpgrade(ServiceContext context, String version, + boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade) + throws IOException, SliderException, TimeoutException, + InterruptedException { + ServiceManager serviceManager = context.getServiceManager(); Service upgradedDef = ServiceTestUtils.createExampleApplication(); upgradedDef.setName(serviceManager.getName()); upgradedDef.setVersion(version); @@ -191,39 +234,57 @@ private void upgrade(ServiceManager serviceManager, String version, }); } writeUpgradedDef(upgradedDef); - serviceManager.processUpgradeRequest(version, autoFinalize); + serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade); ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); - upgradeEvent.setVersion(version); - if (autoFinalize) { - upgradeEvent.setAutoFinalize(true); - } - serviceManager.handle(upgradeEvent); + upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade) + .setAutoFinalize(autoFinalize); + + GenericTestUtils.waitFor(()-> { + ServiceState serviceState = context.service.getState(); + if (serviceState.equals(ServiceState.UPGRADING) || + serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || + serviceState.equals(ServiceState.EXPRESS_UPGRADING)) { + return true; + } + return false; + }, CHECK_EVERY_MILLIS, TIMEOUT); } - private ServiceManager createTestServiceManager(String name) - throws IOException { - ServiceContext context = new ServiceContext(); - context.service = createBaseDef(name); - context.fs = rule.getFs(); - - context.scheduler = new ServiceScheduler(context) { - @Override - protected YarnRegistryViewForProviders createYarnRegistryOperations( - ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); + private void upgradeInstances(ServiceContext context) throws + TimeoutException, InterruptedException { + // upgrade the instances + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); + + // become ready + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.BECOME_READY); + + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); + GenericTestUtils.waitFor(()-> { + for (ComponentInstance instance: + context.scheduler.getLiveInstances().values()) { + if (!instance.getContainerState().equals(ContainerState.READY)) { + return false; + } } - }; - - context.scheduler.init(rule.getConf()); - - Map - componentState = context.scheduler.getAllComponents(); - context.service.getComponents().forEach(component -> { - componentState.put(component.getName(), - new org.apache.hadoop.yarn.service.component.Component(component, - 1L, context)); - }); - return new ServiceManager(context); + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); + } + + private ServiceContext createServiceContext(String name) + throws Exception { + Service service = createBaseDef(name); + ServiceContext context = new MockRunningServiceContext(rule, + service); + context.scheduler.getDispatcher().setDrainEventsOnStop(); + context.scheduler.getDispatcher().start(); + return context; } public static Service createBaseDef(String name) { @@ -257,4 +318,6 @@ private void writeUpgradedDef(Service upgradedDef) upgradedDef); } + private static final int TIMEOUT = 200000; + private static final int CHECK_EVERY_MILLIS = 100; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 8b13b2495b8..216d88fc4c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -415,6 +415,41 @@ public void testUpgrade() throws Exception { client.actionDestroy(service.getName()); } + @Test(timeout = 200000) + public void testExpressUpgrade() throws Exception { + setupInternal(NUM_NMS); + getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true); + ServiceClient client = createClient(getConf()); + + Service service = createExampleApplication(); + client.actionCreate(service); + waitForServiceToBeStable(client, service); + + // upgrade the service + Component component = service.getComponents().iterator().next(); + service.setState(ServiceState.EXPRESS_UPGRADING); + service.setVersion("v2"); + component.getConfiguration().getEnv().put("key1", "val1"); + Component component2 = service.getComponent("compb"); + component2.getConfiguration().getEnv().put("key2", "val2"); + client.actionUpgradeExpress(service); + + // wait for upgrade to complete + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + Assert.assertEquals("compa does not have new env", "val1", + active.getComponent(component.getName()).getConfiguration() + .getEnv("key1")); + Assert.assertEquals("compb does not have new env", "val2", + active.getComponent(component2.getName()).getConfiguration() + .getEnv("key2")); + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); + } + // Test to verify ANTI_AFFINITY placement policy // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler // 2. Create an example service with 3 containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java new file mode 100644 index 00000000000..5ff2ad2809f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.utils; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.junit.Test; +import org.junit.Assert; + +public class TestServiceApiUtil extends ServiceTestUtils { + + public static Service createExampleApplication() { + + Service exampleApp = new Service(); + exampleApp.setName("example-app"); + exampleApp.setVersion("v1"); + return exampleApp; + } + + @Test + public void testResolveCompsDependency() { + Service service = createExampleApplication(); + List dependencies = new ArrayList(); + dependencies.add("compb"); + Component compa = createComponent("compa"); + compa.setDependencies(dependencies); + Component compb = createComponent("compb"); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compb"); + expected.add("compa"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), order.get(i)); + } + } + + @Test + public void testResolveCompsDependencyReversed() { + Service service = createExampleApplication(); + List dependencies = new ArrayList(); + dependencies.add("compa"); + Component compa = createComponent("compa"); + Component compb = createComponent("compb"); + compb.setDependencies(dependencies); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compa"); + expected.add("compb"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), order.get(i)); + } + } + + @Test + public void testResolveCompsCircularDependency() { + Service service = createExampleApplication(); + List dependencies = new ArrayList(); + List dependencies2 = new ArrayList(); + dependencies.add("compb"); + dependencies2.add("compa"); + Component compa = createComponent("compa"); + compa.setDependencies(dependencies); + Component compb = createComponent("compb"); + compa.setDependencies(dependencies2); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compa"); + expected.add("compb"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), order.get(i)); + } + } + + @Test + public void testResolveNoCompsDependency() { + Service service = createExampleApplication(); + Component compa = createComponent("compa"); + Component compb = createComponent("compb"); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compa"); + expected.add("compb"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), order.get(i)); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 807938c7f0c..894aae8095e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.cli; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -100,6 +101,7 @@ public static final String COMPONENT = "component"; public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch"; public static final String UPGRADE_CMD = "upgrade"; + public static final String UPGRADE_EXPRESS = "express"; public static final String UPGRADE_INITIATE = "initiate"; public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; public static final String UPGRADE_FINALIZE = "finalize"; @@ -107,7 +109,7 @@ public static final String COMPONENTS = "components"; public static final String VERSION = "version"; public static final String STATES = "states"; - + private static String firstArg = null; private boolean allAppStates; @@ -247,6 +249,9 @@ public int run(String[] args) throws Exception { opts.addOption(UPGRADE_CMD, true, "Upgrades an application/long-" + "running service. It requires either -initiate, -instances, or " + "-finalize options."); + opts.addOption(UPGRADE_EXPRESS, true, "Works with -upgrade option to " + + "perform express upgrade. It requires the upgraded application " + + "specification file."); opts.addOption(UPGRADE_INITIATE, true, "Works with -upgrade option to " + "initiate the application upgrade. It requires the upgraded " + "application specification file."); @@ -639,9 +644,9 @@ public int run(String[] args) throws Exception { moveApplicationAcrossQueues(cliParser.getOptionValue(APP_ID), cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE)); } else if (cliParser.hasOption(UPGRADE_CMD)) { - if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE, - UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, COMPONENTS, - APP_TYPE_CMD)) { + if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS, + UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, + COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) { printUsage(title, opts); return exitCode; } @@ -649,7 +654,14 @@ public int run(String[] args) throws Exception { AppAdminClient client = AppAdminClient.createAppAdminClient(appType, getConf()); String appName = cliParser.getOptionValue(UPGRADE_CMD); - if (cliParser.hasOption(UPGRADE_INITIATE)) { + if (cliParser.hasOption(UPGRADE_EXPRESS)) { + File file = new File(cliParser.getOptionValue(UPGRADE_EXPRESS)); + if (!file.exists()) { + System.err.println(file.getAbsolutePath() + " does not exist."); + return exitCode; + } + return client.actionUpgradeExpress(appName, file); + } else if (cliParser.hasOption(UPGRADE_INITIATE)) { if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, APP_TYPE_CMD)) { printUsage(title, opts); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 3fb4778327e..40b4f6952f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; @@ -288,4 +289,12 @@ public abstract String getInstances(String appName, List components, String version, List containerStates) throws IOException, YarnException; + /** + * Upgrade a long running service. + * + * @param appName + * @return + */ + public abstract int actionUpgradeExpress(String appName, File fileName) + throws IOException, YarnException; }