diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 91dbc001c5d..2b4b990f167 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -983,6 +983,10 @@ public void onError(Throwable e) { done = true; amRMClient.stop(); } + + @Override + public void onContainersFromPreviousAttempts(List containers) { + } } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 6bc567328fe..592088b5331 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -99,6 +99,8 @@ */ public class ServiceScheduler extends CompositeService { + public static final String YARN_COMPONENT = "yarn:component"; + private static final Logger LOG = LoggerFactory.getLogger(ServiceScheduler.class); private Service app; @@ -135,6 +137,8 @@ private YarnRegistryViewForProviders yarnRegistryOperations; private ServiceContext context; private ContainerLaunchService containerLaunchService; + private Map unRecoveredInstances = + new ConcurrentHashMap<>(); public ServiceScheduler(ServiceContext context) { super(context.service.getName()); @@ -300,7 +304,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { } for (Container container : containersFromPrevAttempt) { LOG.info("Handling {} from previous attempt", container.getId()); - ServiceRecord record = existingRecords.get(RegistryPathUtils + ServiceRecord record = existingRecords.remove(RegistryPathUtils .encodeYarnID(container.getId().toString())); if (record != null) { Component comp = componentsById.get(container.getAllocationRequestId()); @@ -317,6 +321,23 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { amRMClient.releaseAssignedContainer(container.getId()); } } + + existingRecords.forEach((encodedContainerId, record) -> { + String componentName = record.get(YARN_COMPONENT); + if (componentName != null) { + Component component = componentsByName.get(componentName); + ComponentInstance compInstance = component.getComponentInstance( + record.description); + ContainerId containerId = ContainerId.fromString(record.get( + YarnRegistryAttributes.YARN_ID)); + ComponentEvent event = new ComponentEvent(componentName, + CONTAINER_RECOVERED) + .setInstance(compInstance) + .setContainerId(containerId); + unRecoveredInstances.put(containerId, compInstance); + component.handle(event); + } + }); } private void initGlobalTokensForSubstitute(ServiceContext context) { @@ -501,6 +522,31 @@ public void onContainersAllocated(List containers) { } } + @Override + public void onContainersFromPreviousAttempts(List containers) { + if (containers == null || containers.isEmpty()) { + return; + } + for (Container container : containers) { + ComponentInstance compInstance = + unRecoveredInstances.remove(container.getId()); + if (compInstance != null) { + Component component = componentsById.get( + container.getAllocationRequestId()); + ComponentEvent event = new ComponentEvent(component.getName(), + CONTAINER_RECOVERED) + .setInstance(compInstance) + .setContainerId(container.getId()) + .setContainer(container); + component.handle(event); + } else { + LOG.info("Not waiting to recover container {}, releasing", + container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + } + } + } + @Override public void onContainersCompleted(List statuses) { for (ContainerStatus status : statuses) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/ServiceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/ServiceConfiguration.java new file mode 100644 index 00000000000..ae045d26f8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/ServiceConfiguration.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.yarn.service.api; + +import org.apache.hadoop.conf.Configuration; + + +public class ServiceConfiguration extends Configuration { + + public static final String YARN_SERVICE_PREFIX = "yarn.service."; + + /** + * How long to wait until a container is considered dead. + */ + public static final String CONTAINER_EXPIRY_INTERVAL_MS = + YARN_SERVICE_PREFIX + "container.expiry-interval-ms"; + + public static final int DEFAULT_CONTAINER_EXPIRY_INTERVAL_MS = 200000; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 88f47635e27..38f27e6a890 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.component; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.api.ServiceConfiguration; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; @@ -52,10 +54,12 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -96,6 +100,14 @@ private StateMachine stateMachine; private AsyncDispatcher dispatcher; + + // component instances that are waiting for the corresponding container to + // recover. + private Map recoveringInstances = + new ConcurrentHashMap<>(); + + private long containerWaitMs; + private static final StateMachineFactory stateMachineFactory = new StateMachineFactory( @@ -106,7 +118,9 @@ // container recovered on AM restart .addTransition(INIT, INIT, CONTAINER_RECOVERED, new ContainerRecoveredTransition()) - + // container recovered in AM heartbeat + .addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED, + new ContainerRecoveredTransition()) // container allocated by RM .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED, new ContainerAllocatedTransition()) @@ -157,6 +171,27 @@ public Component( maxContainerFailurePerComp = componentSpec.getConfiguration() .getPropertyInt(CONTAINER_FAILURE_THRESHOLD, 10); createNumCompInstances(component.getNumberOfContainers()); + this.containerWaitMs = + context.scheduler.getConfig().getInt( + ServiceConfiguration.CONTAINER_EXPIRY_INTERVAL_MS, + ServiceConfiguration.DEFAULT_CONTAINER_EXPIRY_INTERVAL_MS); + + scheduler.executorService.scheduleAtFixedRate(() -> { + Iterator> iterator = + recoveringInstances.entrySet().iterator(); + while(iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (System.currentTimeMillis() - entry.getValue().getWaitStartTime() + > containerWaitMs) { + iterator.remove(); + LOG.info("Wait on container {} expired, adding to pending", + entry.getKey()); + pendingInstances.add(entry.getValue()); + scheduler.getAmRMClient().releaseAssignedContainer( + entry.getKey()); + } + } + }, containerWaitMs, containerWaitMs, TimeUnit.MILLISECONDS); } private void createNumCompInstances(long count) { @@ -257,8 +292,16 @@ public void transition(Component component, ComponentEvent event) { component.releaseContainer(container); return; } - component.pendingInstances.remove(instance); + if (container == null) { + LOG.info("[COMPONENT {}]: container for {} is not yet recovered by RM", + component.getName(), instance); + instance.setWaitStartTime(System.currentTimeMillis()); + component.recoveringInstances.put(event.getContainerId(), + instance); + return; + } + component.recoveringInstances.remove(container.getId()); instance.setContainer(container); ProviderUtils.initCompInstanceDir(component.getContext().fs, instance); component.getScheduler().addLiveCompInstance(container.getId(), instance); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 509f6675006..d32f09f21eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -62,7 +62,6 @@ import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; -import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; @@ -91,6 +90,7 @@ private long containerStartedTime = 0; // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + private volatile long waitStartTime; private static final StateMachineFactory @@ -371,6 +371,14 @@ public NodeId getNodeId() { return component.getComponentSpec(); } + public long getWaitStartTime() { + return waitStartTime; + } + + public void setWaitStartTime(long waitStartTime) { + this.waitStartTime = waitStartTime; + } + private static class BaseTransition implements SingleArcTransition { @@ -398,6 +406,7 @@ private void updateServiceRecord( record.set(YARN_PERSISTENCE, PersistencePolicies.CONTAINER); record.set(YARN_IP, status.getIPs().get(0)); record.set(YARN_HOSTNAME, status.getHost()); + record.set(ServiceScheduler.YARN_COMPONENT, component.getName()); try { yarnRegistry .putComponent(RegistryPathUtils.encodeYarnID(containerId), record); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java index 1a880ba4426..bbe398f17d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.component.instance; public enum ComponentInstanceEventType { + RECOVER, START, STOP, BECOME_READY, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index 429816137c5..b2640b5daf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -19,8 +19,13 @@ package org.apache.hadoop.yarn.service; import com.google.common.base.Supplier; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -42,15 +47,24 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MockServiceAM extends ServiceMaster { + private static final Logger LOG = + LoggerFactory.getLogger(MockServiceAM.class); + Service service; // The list of containers fed by tests to be returned on // AMRMClientCallBackHandler#onContainersAllocated @@ -59,6 +73,16 @@ final List failedContainers = Collections.synchronizedList(new LinkedList<>()); + + final List recoveredContainers = + Collections.synchronizedList(new LinkedList<>()); + + final Map registryComponents = + new ConcurrentHashMap<>(); + + private Map containerStatus = + new ConcurrentHashMap<>(); + public MockServiceAM(Service service) { super(service.getName()); this.service = service; @@ -75,7 +99,7 @@ protected ContainerId getAMContainerId() @Override protected Path getAppDir() { Path path = new Path(new Path("target", "apps"), service.getName()); - System.out.println("Service path: " + path); + LOG.info("Service path: {}", path); return path; } @@ -84,10 +108,24 @@ protected ServiceScheduler createServiceScheduler(ServiceContext context) throws IOException, YarnException { return new ServiceScheduler(context) { + @SuppressWarnings("SuspiciousMethodCalls") @Override protected YarnRegistryViewForProviders createYarnRegistryOperations( ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); + YarnRegistryViewForProviders yarnRegistryView = mock( + YarnRegistryViewForProviders.class); + if (!registryComponents.isEmpty()) { + try { + when(yarnRegistryView.listComponents()) + .thenReturn(new LinkedList<>(registryComponents.keySet())); + when(yarnRegistryView.getComponent(anyString())).thenAnswer( + invocation -> + registryComponents.get(invocation.getArguments()[0])); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return yarnRegistryView; } @Override @@ -101,7 +139,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( // add new containers if any synchronized (feedContainers) { if (feedContainers.isEmpty()) { - System.out.println("Allocating........ no containers"); + LOG.info("Allocating........ no containers"); } else { // The AMRMClient will return containers for compoenent that are // at FLEXING state @@ -112,7 +150,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( org.apache.hadoop.yarn.service.component.Component component = componentsById.get(c.getAllocationRequestId()); if (component.getState() == ComponentState.FLEXING) { - System.out.println("Allocated container " + c.getId()); + LOG.info("Allocated container {} ", c.getId()); allocatedContainers.add(c); itor.remove(); } @@ -121,6 +159,17 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( } } + // add recovered containers if any + synchronized (recoveredContainers) { + if (!recoveredContainers.isEmpty()) { + List containersFromPrevAttempt = new LinkedList<>(); + containersFromPrevAttempt.addAll(recoveredContainers); + recoveredContainers.clear(); + builder.containersFromPreviousAttempt( + containersFromPrevAttempt); + } + } + // add failed containers if any synchronized (failedContainers) { if (!failedContainers.isEmpty()) { @@ -146,15 +195,23 @@ public RegisterApplicationMasterResponse registerApplicationMaster( } }; - return AMRMClientAsync - .createAMRMClientAsync(client1, 1000, + return AMRMClientAsync.createAMRMClientAsync(client1, 1000, this.new AMRMClientCallback()); } + @SuppressWarnings("SuspiciousMethodCalls") @Override public NMClientAsync createNMClient() { NMClientAsync nmClientAsync = super.createNMClient(); - nmClientAsync.setClient(mock(NMClient.class)); + NMClient nmClient = mock(NMClient.class); + try { + when(nmClient.getContainerStatus(anyObject(), anyObject())) + .thenAnswer(invocation -> + containerStatus.get(invocation.getArguments()[0])); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } + nmClientAsync.setClient(nmClient); return nmClientAsync; } }; @@ -165,6 +222,37 @@ public NMClientAsync createNMClient() { context.service = service; } + public void feedRegistryComponent(ContainerId containerId, String compName, + String compInstName) { + ServiceRecord record = new ServiceRecord(); + record.set(YarnRegistryAttributes.YARN_ID, containerId.toString()); + record.description = compInstName; + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.CONTAINER); + record.set(YarnRegistryAttributes.YARN_IP, "localhost"); + record.set(YarnRegistryAttributes.YARN_HOSTNAME, "localhost"); + record.set(ServiceScheduler.YARN_COMPONENT, compName); + registryComponents.put(RegistryPathUtils.encodeYarnID( + containerId.toString()), record); + } + + /** + * Simulates a recovered container that is sent to the AM in the heartbeat + * response. + * + * @param containerId The ID for the container + * @param compName The component to which the recovered container is fed. + */ + public void feedRecoveredContainer(ContainerId containerId, String compName) { + Container container = createContainer(containerId, compName); + recoveredContainers.add(container); + ContainerStatus status = ContainerStatus.newInstance(containerId, + ContainerState.RUNNING, "", 0); + status.setHost(container.getNodeId().getHost()); + status.setIPs(Lists.newArrayList(container.getNodeId().getHost())); + containerStatus.put(containerId, status); + } + /** * * @param service The service for the component @@ -174,17 +262,8 @@ public NMClientAsync createNMClient() { */ public Container feedContainerToComp(Service service, int id, String compName) { - ApplicationId applicationId = ApplicationId.fromString(service.getId()); - ContainerId containerId = ContainerId - .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id); - NodeId nodeId = NodeId.newInstance("localhost", 1234); - Container container = Container - .newInstance(containerId, nodeId, "localhost", - Resource.newInstance(100, 1), Priority.newInstance(0), null); - - long allocateId = - context.scheduler.getAllComponents().get(compName).getAllocateId(); - container.setAllocationRequestId(allocateId); + ContainerId containerId = createContainerId(id); + Container container = createContainer(containerId, compName); synchronized (feedContainers) { feedContainers.add(container); } @@ -203,6 +282,23 @@ public void feedFailedContainerToComp(Service service, int id, String } } + public ContainerId createContainerId(int id) { + ApplicationId applicationId = ApplicationId.fromString(service.getId()); + return ContainerId.newContainerId( + ApplicationAttemptId.newInstance(applicationId, 1), id); + } + + private Container createContainer(ContainerId containerId, String compName) { + NodeId nodeId = NodeId.newInstance("localhost", 1234); + Container container = Container.newInstance( + containerId, nodeId, "localhost", + Resource.newInstance(100, 1), + Priority.newInstance(0), null); + long allocateId = + context.scheduler.getAllComponents().get(compName).getAllocateId(); + container.setAllocationRequestId(allocateId); + return container; + } public void flexComponent(String compName, long numberOfContainers) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index fb4de0d5714..1eb3dc39a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -20,7 +20,9 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; @@ -29,6 +31,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -39,6 +43,9 @@ public class TestServiceAM extends ServiceTestUtils{ + private static final Logger LOG = + LoggerFactory.getLogger(TestServiceAM.class); + private File basedir; YarnConfiguration conf = new YarnConfiguration(); TestingCluster zkCluster; @@ -54,7 +61,7 @@ public void setup() throws Exception { zkCluster = new TestingCluster(1); zkCluster.start(); conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); - System.out.println("ZK cluster: " + zkCluster.getConnectString()); + LOG.info("ZK cluster: {}", zkCluster.getConnectString()); } @After @@ -91,7 +98,7 @@ public void testContainerCompleted() throws TimeoutException, am.feedContainerToComp(exampleApp, 1, "compa"); am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED); - System.out.println("Fail the container 1"); + LOG.info("Fail the container 1"); // fail the container am.feedFailedContainerToComp(exampleApp, 1, "compa"); @@ -106,4 +113,46 @@ public void testContainerCompleted() throws TimeoutException, am.getComponent("compa").getPendingInstances().size()); am.stop(); } + + // Test to verify that the containers of previous attempt are not prematurely + // released. These containers are sent by the RM to the AM in the + // heartbeat response. + @Test(timeout = 200000) + public void testContainersFromPreviousAttemptsWithRMRestart() + throws Exception { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setName("testContainersRecovers"); + String comp1Name = "comp1"; + String comp1InstName = "comp1-0"; + + org.apache.hadoop.yarn.service.api.records.Component compA = + createComponent(comp1Name, 1, "sleep"); + exampleApp.addComponent(compA); + + MockServiceAM am = new MockServiceAM(exampleApp); + ContainerId containerId = am.createContainerId(1); + am.feedRegistryComponent(containerId, comp1Name, comp1InstName); + am.init(conf); + am.start(); + + ComponentInstance comp10 = am.getCompInstance(comp1Name, comp1InstName); + am.feedRecoveredContainer(containerId, comp1Name); + am.waitForCompInstanceState(comp10, ComponentInstanceState.STARTED); + + // 0 pending instance + Assert.assertEquals(0, + am.getComponent(comp1Name).getPendingInstances().size()); + + GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName) + .getContainerStatus() + != null, 2000,200000); + Assert.assertEquals("container state", + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + am.getCompInstance(comp1Name, comp1InstName).getContainerStatus() + .getState()); + am.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 793ad79e08d..dfba1e9794b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -486,6 +486,13 @@ public void waitFor(Supplier check, int checkEveryMillis, * stop() is the recommended action. */ public abstract void onError(Throwable e); + + /** + * Called when the ResourceManager responds to a heartbeat with containers + * from previous attempt. + */ + public abstract void onContainersFromPreviousAttempts( + List containers); } /** @@ -531,5 +538,7 @@ public void waitFor(Supplier check, int checkEveryMillis, * @param e */ void onError(Throwable e); + + void onContainersFromPreviousAttempts(List containers); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 031cdecc915..5ec225046e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -358,6 +358,11 @@ public void run() { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } + + if (!response.getContainersFromPreviousAttempts().isEmpty()) { + handler.onContainersFromPreviousAttempts( + response.getContainersFromPreviousAttempts()); + } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 7306b3ac3c3..ace7051d2b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -557,6 +557,10 @@ public void onError(Throwable e) { notifier.notifyAll(); } } + + @Override + public void onContainersFromPreviousAttempts(List containers) { + } } private class TestCallbackHandler2 @@ -600,6 +604,10 @@ public void onError(Throwable e) { callStopAndNotify(); } + @Override + public void onContainersFromPreviousAttempts(List containers) { + } + void callStopAndNotify() { if(stop) { asynClient.stop();