diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 81c56d2429f..083c0f40990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.conf.SliderExitCodes; @@ -878,20 +880,73 @@ public String updateLifetime(String serviceName, long lifetime) return newTimeout; } - public ServiceState convertState(FinalApplicationStatus status) { - switch (status) { - case UNDEFINED: + public ServiceState convertState(YarnApplicationState state) { + switch (state) { + case NEW: + case NEW_SAVING: + case SUBMITTED: + case ACCEPTED: return ServiceState.ACCEPTED; + case RUNNING: + return ServiceState.STARTED; + case FINISHED: + return ServiceState.STOPPED; case FAILED: case KILLED: return ServiceState.FAILED; - case ENDED: - case SUCCEEDED: - return ServiceState.STOPPED; } return ServiceState.ACCEPTED; } + private void enrichService(Service service) { + if (service == null || service.getComponents() == null) { + return; + } + // count total no of requested and running containers across all components + long totalRequestedContainers = 0; + long totalRunningContainers = 0; + // consider the service to be stable if and only if all containers are + // running and ready + boolean allContainersReady = true; + for (Component comp : service.getComponents()) { + long noOfRequestedContainers = comp.getNumberOfContainers(); + if (noOfRequestedContainers > 0) { + totalRequestedContainers += noOfRequestedContainers; + if (comp.getContainers() != null) { + long noOfRunningContainers = comp.getContainers().size(); + totalRunningContainers += noOfRunningContainers; + // if allContainersReady is already false, no need to check further + if (allContainersReady) { + if (noOfRunningContainers < noOfRequestedContainers) { + allContainersReady = false; + } else { + // need to check if each and every container is in ready state + for (Container container : comp.getContainers()) { + if (container.getState() != ContainerState.READY) { + allContainersReady = false; + break; + } + } + } + } + } else { + // maybe containers for this component are not allocated yet + allContainersReady = false; + } + } + } + // Currently, all components with 0 requested containers (hence a total of 0 + // requested containers for the entire service) is being considered as a + // stable state. Hence if totalRequestedContainers is still 0 at this point, + // let allContainersReady be set to true. Note, total running containers not + // equal to total requested containers, is already covered by the above + // checks, so no need to check explicitly here. + if (allContainersReady) { + service.setState(ServiceState.STABLE); + } + service.setNumberOfRunningContainers(totalRunningContainers); + } + public String getStatusString(String appId) throws IOException, YarnException { ApplicationReport appReport = @@ -917,7 +972,7 @@ public Service getStatus(String serviceName) ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); Service appSpec = new Service(); appSpec.setName(serviceName); - appSpec.setState(convertState(appReport.getFinalApplicationStatus())); + appSpec.setState(convertState(appReport.getYarnApplicationState())); ApplicationTimeout lifetime = appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); if (lifetime != null) { @@ -938,6 +993,7 @@ public Service getStatus(String serviceName) GetStatusResponseProto response = amProxy.getStatus(GetStatusRequestProto.newBuilder().build()); appSpec = jsonSerDeser.fromJson(response.getStatus()); + enrichService(appSpec); return appSpec; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 1c517d941b0..fa1cf2cfffa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; @@ -90,25 +91,25 @@ public void testCreateFlexStopDestroyService() throws Exception { // check app.json is persisted. Assert.assertTrue( getFS().exists(new Path(appDir, exampleApp.getName() + ".json"))); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); // Flex two components, each from 2 container to 3 containers. flexComponents(client, exampleApp, 3L); // wait for flex to be completed, increase from 2 to 3 containers. - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); // check all instances name for each component are in sequential order. checkCompInstancesInOrder(client, exampleApp); // flex down to 1 flexComponents(client, exampleApp, 1L); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); checkCompInstancesInOrder(client, exampleApp); // check component dir and registry are cleaned up. // flex up again to 2 flexComponents(client, exampleApp, 2L); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); checkCompInstancesInOrder(client, exampleApp); // stop the service @@ -145,7 +146,7 @@ public void testComponentStartOrder() throws Exception { exampleApp.addComponent(compb); client.actionCreate(exampleApp); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); // check that containers for compa are launched before containers for compb checkContainerLaunchDependencies(client, exampleApp, "compa", "compb"); @@ -372,6 +373,28 @@ private void waitForOneCompToBeReady(ServiceClient client, return allContainers; } + /** + * Wait until service state becomes stable. A service is stable when all + * requested containers of all components are running and in ready state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForServiceToBeStable(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + return retrievedApp.getState() == ServiceState.STABLE; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, 200000); + } + private ServiceClient createClient() throws Exception { ServiceClient client = new ServiceClient() { @Override protected Path addJarResource(String appName,