diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index a7b7e22d42c..de0db4fb82f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -39,7 +39,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -75,13 +77,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -91,6 +97,8 @@ import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.registry.client.api.RegistryConstants.*; +import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.YARN_HOSTNAME; +import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.YARN_IP; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; @@ -302,26 +310,74 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { } } } + List pendingContainers = new LinkedList<>(); + for (Container container : recoveredContainers) { LOG.info("Handling container {} from previous attempt", container.getId()); + Component comp = componentsById.get(container.getAllocationRequestId()); ServiceRecord record = existingRecords.get(RegistryPathUtils .encodeYarnID(container.getId().toString())); if (record != null) { - Component comp = componentsById.get(container.getAllocationRequestId()); + LOG.info("Record found in registry for container {} from " + + "previous attempt", container.getId()); ComponentEvent event = new ComponentEvent(comp.getName(), CONTAINER_RECOVERED) .setContainer(container) - .setInstance(comp.getComponentInstance(record.description)); + .setInstance(comp.getComponentInstance(record.description)) + .setStatus(createStatusFrom(container, record)); comp.handle(event); // do not remove requests in this case because we do not know if they // have already been removed } else { - LOG.info("Record not found in registry for container {} from previous" + - " attempt, releasing", container.getId()); - amRMClient.releaseAssignedContainer(container.getId()); + pendingContainers.add(container); + } + } + + //For pending containers instances, find a pending component instance and + //retrieve its status from the NM + Iterator pendingContainersIter = pendingContainers.iterator(); + while (pendingContainersIter.hasNext()) { + Container container = pendingContainersIter.next(); + Component component = componentsById.get( + container.getAllocationRequestId()); + + ComponentInstance componentInstance = component.getPendingInstances() + .stream().findFirst().orElse(null); + if (componentInstance != null) { + LOG.info("Record not found in registry for container {} from " + + "previous attempt, request status from NM", + container.getId()); + + ComponentEvent event = + new ComponentEvent(component.getName(), CONTAINER_RECOVERED) + .setContainer(container) + .setInstance(componentInstance); + component.handle(event); + pendingContainersIter.remove(); } } + + //Release all the pending containers + pendingContainers.forEach(container -> { + LOG.info("Record not found in registry for container {} from previous" + + " attempt, releasing", container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + }); + } + + private ContainerStatus createStatusFrom(Container container, + ServiceRecord serviceRecord) { + + ContainerStatus status = ContainerStatus.newInstance(container.getId(), + container.getExecutionType(), ContainerState.RUNNING, + serviceRecord.description, ContainerExitStatus.INVALID); + + status.setHost(serviceRecord.get(YARN_HOSTNAME)); + List ips = new ArrayList<>(); + ips.add(serviceRecord.get(YARN_IP)); + status.setIPs(new ArrayList<>(ips)); + return status; } private void initGlobalTokensForSubstitute(ServiceContext context) { @@ -688,4 +744,16 @@ public AsyncDispatcher getDispatcher() { public BoundedAppender getDiagnostics() { return diagnostics; } + + public boolean isContainerRecordInRegistry(@Nonnull ContainerId containerId) { + ServiceRecord record = null; + try { + record = yarnRegistryOperations.getComponent( + RegistryPathUtils.encodeYarnID(containerId.toString())); + } catch (IOException e) { + LOG.info("Could not get container {} from registry {}", containerId, + e.getMessage()); + } + return record != null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 7208f391a3a..b33486fc7c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -279,7 +279,7 @@ public void transition(Component component, ComponentEvent event) { instance.getCompInstanceId()); component.compInstanceDispatcher.getEventHandler().handle( new ComponentInstanceEvent(instance.getContainerId(), - START)); + START).setStatus(event.getStatus())); } } @@ -581,4 +581,8 @@ public void handle(ComponentEvent event) { public ServiceContext getContext() { return context; } + + public List getPendingInstances() { + return pendingInstances; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 9e5f98cf342..062401651f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -62,7 +62,6 @@ import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; -import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; @@ -142,12 +141,20 @@ public ComponentInstance(Component component, private static class ContainerStartedTransition extends BaseTransition { @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - // Query container status for ip and host - compInstance.containerStatusFuture = - compInstance.scheduler.executorService.scheduleAtFixedRate( - new ContainerStatusRetriever(compInstance.scheduler, - compInstance.getContainerId(), compInstance), 0, 1, - TimeUnit.SECONDS); + // Query container status for ip and host only when its not present in the + //registry + if (!compInstance.scheduler.isContainerRecordInRegistry( + compInstance.getContainerId())) { + + compInstance.containerStatusFuture = + compInstance.scheduler.executorService.scheduleAtFixedRate( + new ContainerStatusRetriever(compInstance.scheduler, + compInstance.getContainerId(), compInstance), 0, 1, + TimeUnit.SECONDS); + } else { + compInstance.updateContainerStatus(event.getStatus(), false); + } + compInstance.component.incRunningContainers(); long containerStartTime = System.currentTimeMillis(); try { @@ -334,7 +341,8 @@ public ContainerStatus getContainerStatus() { return status; } - public void updateContainerStatus(ContainerStatus status) { + public void updateContainerStatus(ContainerStatus status, + boolean updateServiceRecord) { this.status = status; org.apache.hadoop.yarn.service.api.records.Container container = getCompSpec().getContainer(getContainerId().toString()); @@ -345,7 +353,9 @@ public void updateContainerStatus(ContainerStatus status) { serviceTimelinePublisher.componentInstanceIPHostUpdated(container); } } - updateServiceRecord(yarnRegistryOperations, status); + if (updateServiceRecord) { + updateServiceRecord(yarnRegistryOperations, status); + } } public ContainerId getContainerId() { @@ -490,7 +500,7 @@ public void cleanupRegistryAndCompHdfsDir() { this.instance = instance; } @Override public void run() { - ContainerStatus status = null; + ContainerStatus status; try { status = nmClient.getContainerStatus(containerId, nodeId); } catch (Exception e) { @@ -507,7 +517,7 @@ public void cleanupRegistryAndCompHdfsDir() { .isUnset(status.getHost())) { return; } - instance.updateContainerStatus(status); + instance.updateContainerStatus(status, true); LOG.info( instance.compInstanceId + " IP = " + status.getIPs() + ", host = " + status.getHost() + ", cancel container status retriever"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index f98d90a6f34..bc414f4bdd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -21,7 +21,12 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -31,6 +36,7 @@ import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.junit.After; @@ -42,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.util.*; @@ -177,6 +184,8 @@ public void testRecoverComponentsAfterRMRestart() throws Exception { Service exampleApp = createExampleApplication(); client.actionCreate(exampleApp); waitForAllCompToBeReady(client, exampleApp); + Multimap containersBeforeFailure = getContainersForAllComp( + client, exampleApp); LOG.info("Restart the resource manager"); getYarnCluster().restartResourceManager( @@ -191,9 +200,59 @@ public void testRecoverComponentsAfterRMRestart() throws Exception { ApplicationAttemptId applicationAttemptId = client.getYarnClient() .getApplicationReport(exampleAppId).getCurrentApplicationAttemptId(); + LOG.info("Fail the application attempt {}", applicationAttemptId); + client.getYarnClient().failApplicationAttempt(applicationAttemptId); + //wait until attempt 2 is running + GenericTestUtils.waitFor(() -> { + try { + ApplicationReport ar = client.getYarnClient() + .getApplicationReport(exampleAppId); + return ar.getCurrentApplicationAttemptId().getAttemptId() == 2 && + ar.getYarnApplicationState() == YarnApplicationState.RUNNING; + } catch (YarnException | IOException e) { + throw new RuntimeException("while waiting", e); + } + }, 2000, 200000); + + Multimap containersAfterFailure = getContainersForAllComp( + client, exampleApp); + Assert.assertEquals("component container affected by restart", + containersBeforeFailure, containersAfterFailure); + + LOG.info("Stop/destroy service {}", exampleApp); + client.actionStop(exampleApp.getName(), true); + client.actionDestroy(exampleApp.getName()); + } + + // Test to verify recovery when a AM fails to save container status in + // registry. + // 1. Create an example service. + // 2. Fail the application attempt. + // 3. Explicitly remove a container entry from registry + // 4. Verify no new containers for components are created when AM recovers. + @Test(timeout = 200000) + public void testRecoverComponentsWhenContainerStatusNotInRegistry() + throws Exception { + setupInternal(NUM_NMS); + + ServiceClient client = createClient(); + Service exampleApp = createExampleApplication(); + client.actionCreate(exampleApp); + waitForAllCompToBeReady(client, exampleApp); + + ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId()); + ApplicationAttemptId applicationAttemptId = client.getYarnClient() + .getApplicationReport(exampleAppId).getCurrentApplicationAttemptId(); + Multimap containersBeforeFailure = getContainersForAllComp( client, exampleApp); + String toDeleteContainerId = containersBeforeFailure.entries().iterator() + .next().getValue(); + LOG.info("Delete the registry entry for container {}", toDeleteContainerId); + deleteContainerFromRegistry(getYarnCluster().getConfig(), exampleApp, + toDeleteContainerId); + LOG.info("Fail the application attempt {}", applicationAttemptId); client.getYarnClient().failApplicationAttempt(applicationAttemptId); //wait until attempt 2 is running @@ -273,8 +332,18 @@ private void checkCompInstancesInOrder(ServiceClient client, } } - private void checkRegistryAndCompDirDeleted() { - + private void deleteContainerFromRegistry(@Nonnull Configuration configuration, + Service app, String containerId) throws IOException { + RegistryOperations registryClient = RegistryOperationsFactory + .createInstance("ServiceSchedulerTest", configuration); + registryClient.start(); + String containerPath = RegistryPathUtils.encodeYarnID(containerId); + + String path = RegistryUtils.componentPath(RegistryUtils.currentUser(), + YarnServiceConstants.APP_TYPE, app.getName(), containerPath); + LOG.info("delete {} from registry", path); + registryClient.delete(path, true); + registryClient.stop(); } private void checkEachCompInstancesInOrder(Component component) { @@ -370,9 +439,8 @@ private void waitForAllCompToBeReady(ServiceClient client, Service retrievedApp = client.getStatus(example.getName()); retrievedApp.getComponents().forEach(component -> { if (component.getContainers() != null) { - component.getContainers().forEach(container -> { - allContainers.put(component.getName(), container.getId()); - }); + component.getContainers().forEach(container -> + allContainers.put(component.getName(), container.getId())); } }); return allContainers;