diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 631f89e7161..8f5c63957c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -108,7 +108,7 @@ new ConcurrentHashMap<>(); // id - > component - protected final Map componentsById = + protected final Map componentsById = new ConcurrentHashMap<>(); private final Map liveInstances = @@ -308,7 +308,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { ServiceRecord record = existingRecords.get(RegistryPathUtils .encodeYarnID(container.getId().toString())); if (record != null) { - Component comp = componentsById.get(container.getAllocationRequestId()); + Component comp = componentsById.get(container.getPriority().getPriority()); ComponentEvent event = new ComponentEvent(comp.getName(), CONTAINER_RECOVERED) .setContainer(container) @@ -438,7 +438,7 @@ private void setUserProvidedServiceRecordAttributes( } private void createAllComponents() { - long allocateId = 0; + int allocateId = 0; // sort components by dependencies Collection sortedComponents = @@ -497,7 +497,7 @@ public void handle(ComponentInstanceEvent event) { public void onContainersAllocated(List containers) { LOG.info(containers.size() + " containers allocated. "); for (Container container : containers) { - Component comp = componentsById.get(container.getAllocationRequestId()); + Component comp = componentsById.get(container.getPriority().getPriority()); ComponentEvent event = new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED) .setContainer(container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index d343a03dfcd..096d5c66e7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -114,7 +114,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( while (itor.hasNext()) { Container c = itor.next(); org.apache.hadoop.yarn.service.component.Component component = - componentsById.get(c.getAllocationRequestId()); + componentsById.get(c.getPriority().getPriority()); if (component.getState() == ComponentState.FLEXING) { System.out.println("Allocated container " + c.getId()); allocatedContainers.add(c); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 19a5177b97b..bc1fe8ca032 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -63,7 +64,7 @@ private static final Logger LOG = LoggerFactory.getLogger(ServiceTestUtils.class); - private MiniYARNCluster yarnCluster = null; + protected MiniYARNCluster yarnCluster = null; private MiniDFSCluster hdfsCluster = null; private FileSystem fs = null; private Configuration conf = null; @@ -131,12 +132,18 @@ protected FileSystem getFS() { return fs; } - protected void setupInternal(int numNodeManager) + protected void setupInternal(int numNodeManager) throws Exception + { + setupInternal(numNodeManager, null); + } + + protected void setupInternal(int numNodeManager, @Nullable YarnConfiguration conf) throws Exception { LOG.info("Starting up YARN cluster"); -// Logger rootLogger = LogManager.getRootLogger(); -// rootLogger.setLevel(Level.DEBUG); - setConf(new YarnConfiguration()); + if (conf == null) { + conf = new YarnConfiguration(); + } + setConf(conf); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); // reduce the teardown waiting time conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 64fcf57f921..bfbf7ae8448 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,15 +18,13 @@ package org.apache.hadoop.yarn.service; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.*; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.client.ServiceClient; @@ -41,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -51,7 +50,6 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; /** * End to end tests to test deploying services with MiniYarnCluster and a in-JVM @@ -67,7 +65,8 @@ @Before public void setup() throws Exception { - setupInternal(NUM_NMS); + File tmpYarnDir = new File("target", "tmp"); + FileUtils.deleteQuietly(tmpYarnDir); } @After @@ -84,6 +83,7 @@ public void tearDown() throws IOException { // 6. Destroy the service @Test (timeout = 200000) public void testCreateFlexStopDestroyService() throws Exception { + setupInternal(NUM_NMS); ServiceClient client = createClient(); Service exampleApp = createExampleApplication(); client.actionCreate(exampleApp); @@ -135,6 +135,7 @@ public void testCreateFlexStopDestroyService() throws Exception { // Check containers for compa started before containers for compb @Test (timeout = 200000) public void testComponentStartOrder() throws Exception { + setupInternal(NUM_NMS); ServiceClient client = createClient(); Service exampleApp = new Service(); exampleApp.setName("teststartorder"); @@ -155,6 +156,43 @@ public void testComponentStartOrder() throws Exception { client.actionDestroy(exampleApp.getName()); } + // Test to verify recovery of SeviceMaster after RM is restarted. + // 1. Create an example service. + // 2. Restart RM. + // 3. Fail the application attempt. + // 4. Verify ServiceMaster recovers. + @Test(timeout = 200000) + public void testRecoverComponentsAfterRMRestart() throws Exception { + YarnConfiguration configuration = new YarnConfiguration(); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + configuration.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + setupInternal(NUM_NMS, configuration); + + ServiceClient client = createClient(); + Service exampleApp = createExampleApplication(); + client.actionCreate(exampleApp); + waitForAllCompToBeReady(client, exampleApp); + + LOG.info("Restart the resource manager"); + yarnCluster.restartResourceManager(0); + waitForAllCompToBeReady(client, exampleApp); + + LOG.info("Fail the application attempt"); + ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId()); + client.getYarnClient().failApplicationAttempt( + client.getYarnClient().getApplicationReport(exampleAppId).getCurrentApplicationAttemptId()); + GenericTestUtils.waitFor(() -> { + try { + return client.getYarnClient().getApplicationReport(exampleAppId) + .getYarnApplicationState() == YarnApplicationState.RUNNING; + } catch (YarnException | IOException e) { + throw new RuntimeException("while waiting", e); + } + }, 2000, 200000); + client.actionStop(exampleApp.getName(), true); + client.actionDestroy(exampleApp.getName()); + } + // Check containers launched are in dependency order // Get all containers into a list and sort based on container launch time e.g. // compa-c1, compa-c2, compb-c1, compb-c2;