diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 631f89e7161..ca1a40b0252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -108,7 +108,7 @@ new ConcurrentHashMap<>(); // id - > component - protected final Map componentsById = + protected final Map componentsById = new ConcurrentHashMap<>(); private final Map liveInstances = @@ -308,7 +308,8 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { ServiceRecord record = existingRecords.get(RegistryPathUtils .encodeYarnID(container.getId().toString())); if (record != null) { - Component comp = componentsById.get(container.getAllocationRequestId()); + Component comp = componentsById.get( + container.getPriority().getPriority()); ComponentEvent event = new ComponentEvent(comp.getName(), CONTAINER_RECOVERED) .setContainer(container) @@ -438,7 +439,7 @@ private void setUserProvidedServiceRecordAttributes( } private void createAllComponents() { - long allocateId = 0; + int allocateId = 0; // sort components by dependencies Collection sortedComponents = @@ -497,7 +498,8 @@ public void handle(ComponentInstanceEvent event) { public void onContainersAllocated(List containers) { LOG.info(containers.size() + " containers allocated. "); for (Container container : containers) { - Component comp = componentsById.get(container.getAllocationRequestId()); + Component comp = componentsById.get( + container.getPriority().getPriority()); ComponentEvent event = new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED) .setContainer(container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index a5dd39c0456..98cfd9224c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -72,7 +72,7 @@ private static final Logger LOG = LoggerFactory.getLogger(Component.class); private org.apache.hadoop.yarn.service.api.records.Component componentSpec; - private long allocateId; + private int allocateId; private Priority priority; private ServiceMetrics componentMetrics; private ServiceScheduler scheduler; @@ -134,9 +134,9 @@ public Component( org.apache.hadoop.yarn.service.api.records.Component component, - long allocateId, ServiceContext context) { + int allocateId, ServiceContext context) { this.allocateId = allocateId; - this.priority = Priority.newInstance((int) allocateId); + this.priority = Priority.newInstance(allocateId); this.componentSpec = component; componentMetrics = ServiceMetrics.register(component.getName(), "Metrics for component " + component.getName()); @@ -534,7 +534,7 @@ public Priority getPriority() { return priority; } - public long getAllocateId() { + public int getAllocateId() { return allocateId; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index d343a03dfcd..096d5c66e7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -114,7 +114,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( while (itor.hasNext()) { Container c = itor.next(); org.apache.hadoop.yarn.service.component.Component component = - componentsById.get(c.getAllocationRequestId()); + componentsById.get(c.getPriority().getPriority()); if (component.getState() == ComponentState.FLEXING) { System.out.println("Allocated container " + c.getId()); allocatedContainers.add(c); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 19a5177b97b..6419e0c4bdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -131,12 +132,16 @@ protected FileSystem getFS() { return fs; } + protected MiniYARNCluster getYarnCluster() { + return yarnCluster; + } + protected void setupInternal(int numNodeManager) throws Exception { LOG.info("Starting up YARN cluster"); -// Logger rootLogger = LogManager.getRootLogger(); -// rootLogger.setLevel(Level.DEBUG); - setConf(new YarnConfiguration()); + if (conf == null) { + setConf(new YarnConfiguration()); + } conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); // reduce the teardown waiting time conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 64fcf57f921..754513bc45e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,15 +18,13 @@ package org.apache.hadoop.yarn.service; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.*; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.client.ServiceClient; @@ -41,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -51,7 +50,6 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; /** * End to end tests to test deploying services with MiniYarnCluster and a in-JVM @@ -67,7 +65,8 @@ @Before public void setup() throws Exception { - setupInternal(NUM_NMS); + File tmpYarnDir = new File("target", "tmp"); + FileUtils.deleteQuietly(tmpYarnDir); } @After @@ -84,6 +83,7 @@ public void tearDown() throws IOException { // 6. Destroy the service @Test (timeout = 200000) public void testCreateFlexStopDestroyService() throws Exception { + setupInternal(NUM_NMS); ServiceClient client = createClient(); Service exampleApp = createExampleApplication(); client.actionCreate(exampleApp); @@ -135,6 +135,7 @@ public void testCreateFlexStopDestroyService() throws Exception { // Check containers for compa started before containers for compb @Test (timeout = 200000) public void testComponentStartOrder() throws Exception { + setupInternal(NUM_NMS); ServiceClient client = createClient(); Service exampleApp = new Service(); exampleApp.setName("teststartorder"); @@ -155,6 +156,46 @@ public void testComponentStartOrder() throws Exception { client.actionDestroy(exampleApp.getName()); } + // Test to verify recovery of SeviceMaster after RM is restarted. + // 1. Create an example service. + // 2. Restart RM. + // 3. Fail the application attempt. + // 4. Verify ServiceMaster recovers. + @Test(timeout = 200000) + public void testRecoverComponentsAfterRMRestart() throws Exception { + YarnConfiguration configuration = new YarnConfiguration(); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + configuration.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + setConf(configuration); + setupInternal(NUM_NMS); + + ServiceClient client = createClient(); + Service exampleApp = createExampleApplication(); + client.actionCreate(exampleApp); + waitForAllCompToBeReady(client, exampleApp); + + LOG.info("Restart the resource manager"); + getYarnCluster().restartResourceManager(0); + waitForAllCompToBeReady(client, exampleApp); + + LOG.info("Fail the application attempt"); + ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId()); + client.getYarnClient().failApplicationAttempt( + client.getYarnClient().getApplicationReport(exampleAppId) + .getCurrentApplicationAttemptId()); + GenericTestUtils.waitFor(() -> { + try { + return client.getYarnClient().getApplicationReport(exampleAppId) + .getYarnApplicationState() == YarnApplicationState.RUNNING; + } catch (YarnException | IOException e) { + throw new RuntimeException("while waiting", e); + } + }, 2000, 200000); + client.actionStop(exampleApp.getName(), true); + client.actionDestroy(exampleApp.getName()); + } + // Check containers launched are in dependency order // Get all containers into a list and sort based on container launch time e.g. // compa-c1, compa-c2, compb-c1, compb-c2;