diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index bea31cf65c8..26970508de9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.provider.ProviderUtils; @@ -137,6 +138,9 @@ private YarnRegistryViewForProviders yarnRegistryOperations; private ServiceContext context; private ContainerLaunchService containerLaunchService; + private final Map unRecoveredInstances = + new ConcurrentHashMap<>(); + private long containerRecoveryTimeout; public ServiceScheduler(ServiceContext context) { super(context.service.getName()); @@ -212,6 +216,9 @@ public void buildInstance(ServiceContext context, Configuration configuration) createConfigFileCache(context.fs.getFileSystem()); createAllComponents(); + containerRecoveryTimeout = getConfig().getInt( + YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, + YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS); } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -320,7 +327,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { } for (Container container : containersFromPrevAttempt) { LOG.info("Handling {} from previous attempt", container.getId()); - ServiceRecord record = existingRecords.get(RegistryPathUtils + ServiceRecord record = existingRecords.remove(RegistryPathUtils .encodeYarnID(container.getId().toString())); if (record != null) { Component comp = componentsById.get(container.getAllocationRequestId()); @@ -337,6 +344,40 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { amRMClient.releaseAssignedContainer(container.getId()); } } + + existingRecords.forEach((encodedContainerId, record) -> { + String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT); + if (componentName != null) { + Component component = componentsByName.get(componentName); + ComponentInstance compInstance = component.getComponentInstance( + record.description); + ContainerId containerId = ContainerId.fromString(record.get( + YarnRegistryAttributes.YARN_ID)); + unRecoveredInstances.put(containerId, compInstance); + component.removePendingInstance(compInstance); + } + }); + + if (unRecoveredInstances.size() > 0) { + executorService.schedule(() -> { + synchronized (unRecoveredInstances) { + // after containerRecoveryTimeout, all the containers that haven't be + // recovered by the RM will released. The corresponding Component + // Instances are added to the pending queues of their respective + // component. + unRecoveredInstances.forEach((containerId, instance) -> { + LOG.info("{}, wait on container {} expired", + instance.getCompInstanceId(), containerId); + instance.cleanupRegistryAndCompHdfsDir(containerId); + Component component = componentsByName.get(instance.getCompName()); + component.requestContainers(1); + component.reInsertPendingInstance(instance); + amRMClient.releaseAssignedContainer(containerId); + }); + unRecoveredInstances.clear(); + } + }, containerRecoveryTimeout, TimeUnit.MILLISECONDS); + } } private void initGlobalTokensForSubstitute(ServiceContext context) { @@ -521,6 +562,35 @@ public void onContainersAllocated(List containers) { } } + + @Override + public void onContainersReceivedFromPreviousAttempts( + List containers) { + if (containers == null || containers.isEmpty()) { + return; + } + for (Container container : containers) { + ComponentInstance compInstance; + synchronized (unRecoveredInstances) { + compInstance = unRecoveredInstances.remove(container.getId()); + } + if (compInstance != null) { + Component component = componentsById.get( + container.getAllocationRequestId()); + ComponentEvent event = new ComponentEvent(component.getName(), + CONTAINER_RECOVERED) + .setInstance(compInstance) + .setContainerId(container.getId()) + .setContainer(container); + component.handle(event); + } else { + LOG.info("Not waiting to recover container {}, releasing", + container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + } + } + } + @Override public void onContainersCompleted(List statuses) { for (ContainerStatus status : statuses) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 4e05e5f7a27..5189ab1ac04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -107,6 +107,10 @@ .addTransition(INIT, INIT, CONTAINER_RECOVERED, new ContainerRecoveredTransition()) + // container recovered in AM heartbeat + .addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED, + new ContainerRecoveredTransition()) + // container allocated by RM .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED, new ContainerAllocatedTransition()) @@ -309,6 +313,10 @@ public void transition(Component component, ComponentEvent event) { } } + public void removePendingInstance(ComponentInstance instance) { + pendingInstances.remove(instance); + } + public void reInsertPendingInstance(ComponentInstance instance) { pendingInstances.add(instance); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 509f6675006..31fa5c719c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -39,15 +39,15 @@ import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.BoundedAppender; -import org.apache.hadoop.yarn.service.utils.ServiceUtils; -import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; -import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; -import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +62,6 @@ import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; -import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; @@ -398,6 +397,7 @@ private void updateServiceRecord( record.set(YARN_PERSISTENCE, PersistencePolicies.CONTAINER); record.set(YARN_IP, status.getIPs().get(0)); record.set(YARN_HOSTNAME, status.getHost()); + record.set(YARN_COMPONENT, component.getName()); try { yarnRegistry .putComponent(RegistryPathUtils.encodeYarnID(containerId), record); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java index ea8904ab681..22926e23eb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -22,6 +22,8 @@ public class YarnServiceConf { + private static final String YARN_SERVICE_PREFIX = "yarn.service."; + // Retry settings for the ServiceClient to talk to Service AppMaster public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms"; public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms"; @@ -83,6 +85,14 @@ */ public static final String JVM_OPTS = "yarn.service.am.java.opts"; + /** + * How long to wait until a container is considered dead. + */ + public static final String CONTAINER_RECOVERY_TIMEOUT_MS = + YARN_SERVICE_PREFIX + "container-recovery.timeout.ms"; + + public static final int DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS = 120000; + /** * Get long value for the property. First get from the userConf, if not * present, get from systemConf. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index 429816137c5..37b18faef70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -19,8 +19,13 @@ package org.apache.hadoop.yarn.service; import com.google.common.base.Supplier; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -42,15 +47,24 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MockServiceAM extends ServiceMaster { + private static final Logger LOG = + LoggerFactory.getLogger(MockServiceAM.class); + Service service; // The list of containers fed by tests to be returned on // AMRMClientCallBackHandler#onContainersAllocated @@ -59,6 +73,16 @@ final List failedContainers = Collections.synchronizedList(new LinkedList<>()); + + private final List recoveredContainers = + Collections.synchronizedList(new LinkedList<>()); + + private final Map registryComponents = + new ConcurrentHashMap<>(); + + private Map containerStatuses = + new ConcurrentHashMap<>(); + public MockServiceAM(Service service) { super(service.getName()); this.service = service; @@ -75,7 +99,7 @@ protected ContainerId getAMContainerId() @Override protected Path getAppDir() { Path path = new Path(new Path("target", "apps"), service.getName()); - System.out.println("Service path: " + path); + LOG.info("Service path: {}", path); return path; } @@ -84,10 +108,24 @@ protected ServiceScheduler createServiceScheduler(ServiceContext context) throws IOException, YarnException { return new ServiceScheduler(context) { + @SuppressWarnings("SuspiciousMethodCalls") @Override protected YarnRegistryViewForProviders createYarnRegistryOperations( ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); + YarnRegistryViewForProviders yarnRegistryView = mock( + YarnRegistryViewForProviders.class); + if (!registryComponents.isEmpty()) { + try { + when(yarnRegistryView.listComponents()) + .thenReturn(new LinkedList<>(registryComponents.keySet())); + when(yarnRegistryView.getComponent(anyString())).thenAnswer( + invocation -> + registryComponents.get(invocation.getArguments()[0])); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return yarnRegistryView; } @Override @@ -101,7 +139,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( // add new containers if any synchronized (feedContainers) { if (feedContainers.isEmpty()) { - System.out.println("Allocating........ no containers"); + LOG.info("Allocating........ no containers"); } else { // The AMRMClient will return containers for compoenent that are // at FLEXING state @@ -112,7 +150,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( org.apache.hadoop.yarn.service.component.Component component = componentsById.get(c.getAllocationRequestId()); if (component.getState() == ComponentState.FLEXING) { - System.out.println("Allocated container " + c.getId()); + LOG.info("Allocated container {} ", c.getId()); allocatedContainers.add(c); itor.remove(); } @@ -121,6 +159,17 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( } } + // add recovered containers if any + synchronized (recoveredContainers) { + if (!recoveredContainers.isEmpty()) { + List containersFromPrevAttempt = new LinkedList<>(); + containersFromPrevAttempt.addAll(recoveredContainers); + recoveredContainers.clear(); + builder.containersFromPreviousAttempt( + containersFromPrevAttempt); + } + } + // add failed containers if any synchronized (failedContainers) { if (!failedContainers.isEmpty()) { @@ -146,15 +195,23 @@ public RegisterApplicationMasterResponse registerApplicationMaster( } }; - return AMRMClientAsync - .createAMRMClientAsync(client1, 1000, + return AMRMClientAsync.createAMRMClientAsync(client1, 1000, this.new AMRMClientCallback()); } + @SuppressWarnings("SuspiciousMethodCalls") @Override public NMClientAsync createNMClient() { NMClientAsync nmClientAsync = super.createNMClient(); - nmClientAsync.setClient(mock(NMClient.class)); + NMClient nmClient = mock(NMClient.class); + try { + when(nmClient.getContainerStatus(anyObject(), anyObject())) + .thenAnswer(invocation -> + containerStatuses.get(invocation.getArguments()[0])); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } + nmClientAsync.setClient(nmClient); return nmClientAsync; } }; @@ -165,6 +222,33 @@ public NMClientAsync createNMClient() { context.service = service; } + public void feedRegistryComponent(ContainerId containerId, String compName, + String compInstName) { + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, containerId.toString()); + record.description = compInstName; + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.CONTAINER); + record.set(YarnRegistryAttributes.YARN_IP, "localhost"); + record.set(YarnRegistryAttributes.YARN_HOSTNAME, "localhost"); + record.set(YarnRegistryAttributes.YARN_COMPONENT, compName); + registryComponents.put(RegistryPathUtils.encodeYarnID( + containerId.toString()), record); + } + + /** + * Simulates a recovered container that is sent to the AM in the heartbeat + * response. + * + * @param containerId The ID for the container + * @param compName The component to which the recovered container is fed. + */ + public void feedRecoveredContainer(ContainerId containerId, String compName) { + Container container = createContainer(containerId, compName); + recoveredContainers.add(container); + addContainerStatus(container, ContainerState.RUNNING); + } + /** * * @param service The service for the component @@ -174,20 +258,12 @@ public NMClientAsync createNMClient() { */ public Container feedContainerToComp(Service service, int id, String compName) { - ApplicationId applicationId = ApplicationId.fromString(service.getId()); - ContainerId containerId = ContainerId - .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id); - NodeId nodeId = NodeId.newInstance("localhost", 1234); - Container container = Container - .newInstance(containerId, nodeId, "localhost", - Resource.newInstance(100, 1), Priority.newInstance(0), null); - - long allocateId = - context.scheduler.getAllComponents().get(compName).getAllocateId(); - container.setAllocationRequestId(allocateId); + ContainerId containerId = createContainerId(id); + Container container = createContainer(containerId, compName); synchronized (feedContainers) { feedContainers.add(container); } + addContainerStatus(container, ContainerState.RUNNING); return container; } @@ -196,13 +272,30 @@ public void feedFailedContainerToComp(Service service, int id, String ApplicationId applicationId = ApplicationId.fromString(service.getId()); ContainerId containerId = ContainerId .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id); - ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); - containerStatus.setContainerId(containerId); + ContainerStatus status = Records.newRecord(ContainerStatus.class); + status.setContainerId(containerId); synchronized (failedContainers) { - failedContainers.add(containerStatus); + failedContainers.add(status); } } + public ContainerId createContainerId(int id) { + ApplicationId applicationId = ApplicationId.fromString(service.getId()); + return ContainerId.newContainerId( + ApplicationAttemptId.newInstance(applicationId, 1), id); + } + + private Container createContainer(ContainerId containerId, String compName) { + NodeId nodeId = NodeId.newInstance("localhost", 1234); + Container container = Container.newInstance( + containerId, nodeId, "localhost", + Resource.newInstance(100, 1), + Priority.newInstance(0), null); + long allocateId = + context.scheduler.getAllComponents().get(compName).getAllocateId(); + container.setAllocationRequestId(allocateId); + return container; + } public void flexComponent(String compName, long numberOfContainers) throws IOException { @@ -256,4 +349,13 @@ public Boolean get() { } }, 1000, 20000); } + + private void addContainerStatus(Container container, ContainerState state) { + ContainerStatus status = ContainerStatus.newInstance(container.getId(), + state, "", 0); + status.setHost(container.getNodeId().getHost()); + status.setIPs(Lists.newArrayList(container.getNodeId().getHost())); + containerStatuses.put(container.getId(), status); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index fb4de0d5714..2a3303e11e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -20,15 +20,21 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.component.ComponentState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -39,6 +45,9 @@ public class TestServiceAM extends ServiceTestUtils{ + private static final Logger LOG = + LoggerFactory.getLogger(TestServiceAM.class); + private File basedir; YarnConfiguration conf = new YarnConfiguration(); TestingCluster zkCluster; @@ -54,7 +63,7 @@ public void setup() throws Exception { zkCluster = new TestingCluster(1); zkCluster.start(); conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); - System.out.println("ZK cluster: " + zkCluster.getConnectString()); + LOG.info("ZK cluster: {}", zkCluster.getConnectString()); } @After @@ -91,7 +100,7 @@ public void testContainerCompleted() throws TimeoutException, am.feedContainerToComp(exampleApp, 1, "compa"); am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED); - System.out.println("Fail the container 1"); + LOG.info("Fail the container 1"); // fail the container am.feedFailedContainerToComp(exampleApp, 1, "compa"); @@ -106,4 +115,89 @@ public void testContainerCompleted() throws TimeoutException, am.getComponent("compa").getPendingInstances().size()); am.stop(); } + + // Test to verify that the containers of previous attempt are not prematurely + // released. These containers are sent by the RM to the AM in the + // heartbeat response. + @Test(timeout = 200000) + public void testContainersFromPreviousAttemptsWithRMRestart() + throws Exception { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setName("testContainersRecovers"); + String comp1Name = "comp1"; + String comp1InstName = "comp1-0"; + + org.apache.hadoop.yarn.service.api.records.Component compA = + createComponent(comp1Name, 1, "sleep"); + exampleApp.addComponent(compA); + + MockServiceAM am = new MockServiceAM(exampleApp); + ContainerId containerId = am.createContainerId(1); + am.feedRegistryComponent(containerId, comp1Name, comp1InstName); + am.init(conf); + am.start(); + + ComponentInstance comp10 = am.getCompInstance(comp1Name, comp1InstName); + am.feedRecoveredContainer(containerId, comp1Name); + am.waitForCompInstanceState(comp10, ComponentInstanceState.STARTED); + + // 0 pending instance + Assert.assertEquals(0, + am.getComponent(comp1Name).getPendingInstances().size()); + + GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName) + .getContainerStatus() != null, 2000, 200000); + + Assert.assertEquals("container state", + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + am.getCompInstance(comp1Name, comp1InstName).getContainerStatus() + .getState()); + am.stop(); + } + + // Test to verify that the containers of previous attempt are released and the + // component instance is added to the pending queue when the recovery wait + // time interval elapses. + @Test(timeout = 200000) + public void testContainersReleasedWhenExpired() + throws Exception { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setName("testContainersRecovers"); + String comp1Name = "comp1"; + String comp1InstName = "comp1-0"; + + org.apache.hadoop.yarn.service.api.records.Component compA = + createComponent(comp1Name, 1, "sleep"); + exampleApp.addComponent(compA); + + MockServiceAM am = new MockServiceAM(exampleApp); + ContainerId containerId = am.createContainerId(1); + am.feedRegistryComponent(containerId, comp1Name, comp1InstName); + conf.setLong(YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, 10); + am.init(conf); + am.start(); + Thread.sleep(100); + GenericTestUtils.waitFor(() -> am.getComponent(comp1Name).getState().equals( + ComponentState.FLEXING), 100, 2000); + + // 1 pending instance + Assert.assertEquals(1, + am.getComponent(comp1Name).getPendingInstances().size()); + + am.feedContainerToComp(exampleApp, 2, comp1Name); + + GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName) + .getContainerStatus() != null, 2000, 200000); + Assert.assertEquals("container state", + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + am.getCompInstance(comp1Name, comp1InstName).getContainerStatus() + .getState()); + am.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 793ad79e08d..a3d79598240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -486,6 +486,14 @@ public void waitFor(Supplier check, int checkEveryMillis, * stop() is the recommended action. */ public abstract void onError(Throwable e); + + /** + * Called when the ResourceManager responds to a heartbeat with containers + * from previous attempt. + */ + public void onContainersReceivedFromPreviousAttempts( + List containers) { + } } /** @@ -531,5 +539,7 @@ public void waitFor(Supplier check, int checkEveryMillis, * @param e */ void onError(Throwable e); + + void onContainersReceivedFromPreviousAttempts(List containers); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 031cdecc915..cafb153d803 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -358,6 +358,11 @@ public void run() { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } + + if (!response.getContainersFromPreviousAttempts().isEmpty()) { + handler.onContainersReceivedFromPreviousAttempts( + response.getContainersFromPreviousAttempts()); + } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java index 5eaa9c05200..b6e7a20f48c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java @@ -38,4 +38,5 @@ private YarnRegistryAttributes() { public static final String YARN_PATH = "yarn:path"; public static final String YARN_HOSTNAME = "yarn:hostname"; public static final String YARN_IP = "yarn:ip"; + public static final String YARN_COMPONENT = "yarn:component"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md index 561df7f696d..a6fd998b8cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md @@ -125,6 +125,7 @@ Above config make the service AM to be retried at max 10 times. |yarn.service.log.exclude-pattern| The regex expression for excluding log files whose file name matches it when aggregating the logs after the application completes. If the log file name matches both include and exclude pattern, this file will be excluded. |yarn.service.rolling-log.include-pattern| The regex expression for including log files whose file name matches it when aggregating the logs while app is running. |yarn.service.rolling-log.exclude-pattern| The regex expression for excluding log files whose file name matches it when aggregating the logs while app is running. If the log file name matches both include and exclude pattern, this file will be excluded. +|yarn.service.container-recovery.timeout.ms| The timeout in milliseconds after which the service AM releases all the containers of previous attempt which are not yet recovered by the RM. By default, it is set to 120000, i.e. 2 minutes. ## Constant variables for custom service The service framework provides some constant variables for user to configure their services. These variables are either dynamically generated by the system or are static ones such as service name defined by the user.