diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java index 5b3c72cae4a..5704285f58c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java @@ -101,14 +101,13 @@ + "for constraint %sin placement policy of component %s."; String ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL = "Tag(s) not specified " + "for constraint %sin placement policy of component %s."; - String ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME = "Invalid target tag %s " - + "specified in placement policy of component %s. For now, target tags " - + "support self reference only. Specifying anything other than its " - + "component name is not supported. Set target tag of component %s to " - + "%s."; String ERROR_PLACEMENT_POLICY_TAG_NAME_INVALID = "Invalid target tag %s " + "specified in placement policy of component %s. Target tags should be " + "a valid component name in the service."; + String ERROR_PLACEMENT_POLICY_TAG_INVALID = "Invalid target tag %s " + + "specified in placement policy of component %s. Component %s must " + + "also appear in placement policy of component %s with the same " + + "constraint type."; String ERROR_PLACEMENT_POLICY_EXPRESSION_ELEMENT_NAME_INVALID = "Invalid " + "expression element name %s specified in placement policy of component " + "%s. Expression element names should be a valid constraint name or an " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 447250f9cda..2c1b155c720 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -230,7 +230,7 @@ public static void validateAndResolveService(Service service, } validateComponent(comp, fs.getFileSystem(), conf); } - validatePlacementPolicy(service.getComponents(), componentNames); + validatePlacementPolicy(service, componentNames); // validate dependency tree sortByDependencies(service.getComponents()); @@ -326,9 +326,9 @@ public static void validateNameFormat(String name, namePattern.validate(name); } - private static void validatePlacementPolicy(List components, + private static void validatePlacementPolicy(Service service, Set componentNames) { - for (Component comp : components) { + for (Component comp : service.getComponents()) { PlacementPolicy placementPolicy = comp.getPlacementPolicy(); if (placementPolicy != null) { for (PlacementConstraint constraint : placementPolicy @@ -352,10 +352,37 @@ private static void validatePlacementPolicy(List components, comp.getName())); } for (String targetTag : constraint.getTargetTags()) { - if (!comp.getName().equals(targetTag)) { + if (!componentNames.contains(targetTag)) { throw new IllegalArgumentException(String.format( - RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME, - targetTag, comp.getName(), comp.getName(), comp.getName())); + RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_INVALID, + targetTag, comp.getName())); + } + if (!targetTag.equals(comp.getName())) { + PlacementPolicy otherPolicy = service.getComponent(targetTag) + .getPlacementPolicy(); + if (otherPolicy == null) { + throw new IllegalArgumentException(String.format( + RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_INVALID, + targetTag, comp.getName(), comp.getName(), targetTag)); + } + boolean symmetricTagFound = false; + for (PlacementConstraint otherConstraint : otherPolicy + .getConstraints()) { + if (otherConstraint.getType() != constraint.getType()) { + continue; + } + if (otherConstraint.getScope() != constraint.getScope()) { + continue; + } + if (otherConstraint.getTargetTags().contains(comp.getName())) { + symmetricTagFound = true; + } + } + if (!symmetricTagFound) { + throw new IllegalArgumentException(String.format( + RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_INVALID, + targetTag, comp.getName(), comp.getName(), targetTag)); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestPlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestPlacementPolicy.java new file mode 100644 index 00000000000..4d982e2f211 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestPlacementPolicy.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; +import org.apache.hadoop.yarn.service.api.records.PlacementPolicy; +import org.apache.hadoop.yarn.service.api.records.PlacementScope; +import org.apache.hadoop.yarn.service.api.records.PlacementType; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class TestPlacementPolicy extends ServiceTestUtils { + private static final Logger LOG = + LoggerFactory.getLogger(TestPlacementPolicy.class); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void setup() throws Exception { + File tmpYarnDir = new File("target", "tmp"); + FileUtils.deleteQuietly(tmpYarnDir); + } + + @After + public void tearDown() throws IOException { + shutdown(); + } + + // Test to verify ANTI_AFFINITY placement policy + // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler + // 2. Create an example service with 3 containers + // 3. Verify no more than 1 container comes up in each of the 3 NMs + // 4. Flex the component to 4 containers + // 5. Verify that the 4th container does not even get allocated since there + // are only 3 NMs + @Test(timeout = 200000) + public void testCreateServiceWithPlacementPolicy() throws Exception { + // We need to enable scheduler placement-constraint at the cluster level to + // let apps use placement policies. + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + setConf(conf); + setupInternal(3); + ServiceClient client = createClient(getConf()); + Service exampleApp = new Service(); + exampleApp.setName("example-app"); + exampleApp.setVersion("v1"); + Component comp = createComponent("compa", 3L, "sleep 1000"); + PlacementPolicy pp = new PlacementPolicy(); + PlacementConstraint pc = new PlacementConstraint(); + pc.setName("CA1"); + pc.setTargetTags(Collections.singletonList("compa")); + pc.setScope(PlacementScope.NODE); + pc.setType(PlacementType.ANTI_AFFINITY); + pp.setConstraints(Collections.singletonList(pc)); + comp.setPlacementPolicy(pp); + exampleApp.addComponent(comp); + client.actionCreate(exampleApp); + waitForServiceToBeStable(client, exampleApp); + + // Check service is stable and all 3 containers are running + Service service = client.getStatus(exampleApp.getName()); + Component component = service.getComponent("compa"); + Assert.assertEquals("Service state should be STABLE", ServiceState.STABLE, + service.getState()); + Assert.assertEquals("3 containers are expected to be running", 3, + component.getContainers().size()); + // Prepare a map of non-AM containers for later lookup + Set nonAMContainerIdSet = new HashSet<>(); + for (Container cont : component.getContainers()) { + nonAMContainerIdSet.add(cont.getId()); + } + + // Verify that no more than 1 non-AM container came up on each of the 3 NMs + Set hosts = new HashSet<>(); + ApplicationReport report = client.getYarnClient() + .getApplicationReport(ApplicationId.fromString(exampleApp.getId())); + GetContainersRequest req = GetContainersRequest + .newInstance(report.getCurrentApplicationAttemptId()); + ResourceManager rm = getYarnCluster().getResourceManager(); + for (ContainerReport contReport : rm.getClientRMService().getContainers(req) + .getContainerList()) { + if (!nonAMContainerIdSet + .contains(contReport.getContainerId().toString())) { + continue; + } + if (hosts.contains(contReport.getNodeHttpAddress())) { + Assert.fail("Container " + contReport.getContainerId() + + " came up in the same host as another container."); + } else { + hosts.add(contReport.getNodeHttpAddress()); + } + } + + // Flex compa up to 5, which is more containers than the no of NMs + Map compCounts = new HashMap<>(); + compCounts.put("compa", 5L); + exampleApp.getComponent("compa").setNumberOfContainers(5L); + client.flexByRestService(exampleApp.getName(), compCounts); + try { + // 10 secs is enough for the container to be started. The down side of + // this test is that it has to wait that long. Setting a higher wait time + // will add to the total time taken by tests to run. + waitForServiceToBeStable(client, exampleApp, 10000); + Assert.fail("Service should not be in a stable state. It should throw " + + "a timeout exception."); + } catch (Exception e) { + // Check that service state is not STABLE and only 3 containers are + // running and the fourth one should not get allocated. + service = client.getStatus(exampleApp.getName()); + component = service.getComponent("compa"); + Assert.assertNotEquals("Service state should not be STABLE", + ServiceState.STABLE, service.getState()); + Assert.assertEquals("Component state should be FLEXING", + ComponentState.FLEXING, component.getState()); + Assert.assertEquals("3 containers are expected to be running", 3, + component.getContainers().size()); + } + + // Flex compa down to 4 now, which is still more containers than the no of + // NMs. This tests the usecase that flex down does not kill any of the + // currently running containers since the required number of containers are + // still higher than the currently running number of containers. However, + // component state will still be FLEXING and service state not STABLE. + compCounts = new HashMap<>(); + compCounts.put("compa", 4L); + exampleApp.getComponent("compa").setNumberOfContainers(4L); + client.flexByRestService(exampleApp.getName(), compCounts); + try { + // 10 secs is enough for the container to be started. The down side of + // this test is that it has to wait that long. Setting a higher wait time + // will add to the total time taken by tests to run. + waitForServiceToBeStable(client, exampleApp, 10000); + Assert.fail("Service should not be in a stable state. It should throw " + + "a timeout exception."); + } catch (Exception e) { + // Check that service state is not STABLE and only 3 containers are + // running and the fourth one should not get allocated. + service = client.getStatus(exampleApp.getName()); + component = service.getComponent("compa"); + Assert.assertNotEquals("Service state should not be STABLE", + ServiceState.STABLE, service.getState()); + Assert.assertEquals("Component state should be FLEXING", + ComponentState.FLEXING, component.getState()); + Assert.assertEquals("3 containers are expected to be running", 3, + component.getContainers().size()); + } + + // Finally flex compa down to 3, which is exactly the number of containers + // currently running. This will bring the component and service states to + // STABLE. + compCounts = new HashMap<>(); + compCounts.put("compa", 3L); + exampleApp.getComponent("compa").setNumberOfContainers(3L); + client.flexByRestService(exampleApp.getName(), compCounts); + waitForServiceToBeStable(client, exampleApp); + + LOG.info("Stop/destroy service {}", exampleApp); + client.actionStop(exampleApp.getName(), true); + client.actionDestroy(exampleApp.getName()); + } + + @Test(timeout = 200000) + public void testMultiComponentAntiAffinity() throws Exception { + // We need to enable scheduler placement-constraint at the cluster level to + // let apps use placement policies. + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + setConf(conf); + setupInternal(3); + ServiceClient client = createClient(getConf()); + Service exampleApp = new Service(); + exampleApp.setName("example-app-multi-component-anti-affinity"); + exampleApp.setVersion("v1"); + Component compa = createComponent("compa", 1L, "sleep 1000"); + Component compb = createComponent("compb", 2L, "sleep 1000"); + PlacementPolicy pp = new PlacementPolicy(); + PlacementConstraint pc = new PlacementConstraint(); + pc.setName("CA2"); + pc.setTargetTags(Arrays.asList("compa", "compb")); + pc.setScope(PlacementScope.NODE); + pc.setType(PlacementType.ANTI_AFFINITY); + pp.setConstraints(Collections.singletonList(pc)); + compa.setPlacementPolicy(pp); + compb.setPlacementPolicy(pp); + exampleApp.addComponent(compa); + exampleApp.addComponent(compb); + client.actionCreate(exampleApp); + waitForServiceToBeStable(client, exampleApp); + + // Check service is stable and all 3 containers are running + Service service = client.getStatus(exampleApp.getName()); + Assert.assertEquals("Service state should be STABLE", ServiceState.STABLE, + service.getState()); + // Prepare a map of non-AM containers for later lookup + Set nonAMContainerIdSet = new HashSet<>(); + for (Component component : service.getComponents()) { + for (Container cont : component.getContainers()) { + nonAMContainerIdSet.add(cont.getId()); + } + } + Assert.assertEquals("3 containers are expected to be running", 3, + nonAMContainerIdSet.size()); + + // Verify that no more than 1 non-AM container came up on each of the 3 NMs + Set hosts = new HashSet<>(); + ApplicationReport report = client.getYarnClient() + .getApplicationReport(ApplicationId.fromString(exampleApp.getId())); + GetContainersRequest req = GetContainersRequest + .newInstance(report.getCurrentApplicationAttemptId()); + ResourceManager rm = getYarnCluster().getResourceManager(); + for (ContainerReport contReport : rm.getClientRMService().getContainers(req) + .getContainerList()) { + if (!nonAMContainerIdSet + .contains(contReport.getContainerId().toString())) { + continue; + } + if (hosts.contains(contReport.getNodeHttpAddress())) { + Assert.fail("Container " + contReport.getContainerId() + + " came up in the same host as another container."); + } else { + hosts.add(contReport.getNodeHttpAddress()); + } + } + + // Flex compa and compb up and see that no additional containers are started + Map compCounts = new HashMap<>(); + compCounts.put("compa", 2L); + compCounts.put("compb", 3L); + exampleApp.getComponent("compa").setNumberOfContainers(2L); + exampleApp.getComponent("compb").setNumberOfContainers(3L); + client.flexByRestService(exampleApp.getName(), compCounts); + try { + waitForServiceToBeStable(client, exampleApp, 20000); + Assert.fail("Service should not be in a stable state. It should throw " + + "a timeout exception."); + } catch (Exception e) { + // Check that service state is not STABLE and only 3 containers are + // running and the fourth one should not get allocated. + service = client.getStatus(exampleApp.getName()); + Assert.assertNotEquals("Service state should not be STABLE", + ServiceState.STABLE, service.getState()); + for (Component component : service.getComponents()) { + Assert.assertEquals("Component state should be FLEXING", + ComponentState.FLEXING, component.getState()); + for (Container cont : component.getContainers()) { + Assert.assertTrue("New container should not have been found", + nonAMContainerIdSet.contains(cont.getId())); + } + } + } + + LOG.info("Stop/destroy service {}", exampleApp); + client.actionStop(exampleApp.getName(), true); + client.actionDestroy(exampleApp.getName()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java index ae031d4aad9..87b76b7726f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java @@ -525,6 +525,7 @@ public void testPlacementPolicy() throws IOException { pc.setName("CA1"); pp.setConstraints(Collections.singletonList(pc)); comp.setPlacementPolicy(pp); + app.addComponent(ServiceTestUtils.createComponent("comp-b")); try { ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); @@ -568,8 +569,8 @@ public void testPlacementPolicy() throws IOException { } catch (IllegalArgumentException e) { assertEquals( String.format( - RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME, - "comp-invalid", "comp-a", "comp-a", "comp-a"), + RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_INVALID, + "comp-invalid", "comp-a"), e.getMessage()); } @@ -582,6 +583,63 @@ public void testPlacementPolicy() throws IOException { } catch (IllegalArgumentException e) { Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); } + + // Add another target tag + pc.setTargetTags(Arrays.asList("comp-a", "comp-b")); + assertInvalidTag(app, sfs); + + // Add a placement policy for comp-b, but different tags + PlacementPolicy pp2 = new PlacementPolicy(); + PlacementConstraint pc2 = new PlacementConstraint(); + pc2.setName("CA2"); + pc2.setType(PlacementType.ANTI_AFFINITY); + pc2.setScope(PlacementScope.NODE); + pc2.setTargetTags(Collections.singletonList("comp-b")); + pp2.setConstraints(Collections.singletonList(pc2)); + app.getComponent("comp-b").setPlacementPolicy(pp2); + assertInvalidTag(app, sfs); + + // Same tags, but different type + pc2.setTargetTags(Arrays.asList("comp-a", "comp-b")); + pc2.setType(PlacementType.AFFINITY); + assertInvalidTag(app, sfs); + + // Same tags and type, but different scope + pc2.setType(PlacementType.ANTI_AFFINITY); + pc2.setScope(PlacementScope.RACK); + assertInvalidTag(app, sfs); + + // Same everything + pc2.setScope(PlacementScope.NODE); + + // Finally it should succeed + try { + ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); + } catch (IllegalArgumentException e) { + Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); + } + + // Different valid tag + pc2.setTargetTags(Collections.singletonList("comp-a")); + try { + ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); + } catch (IllegalArgumentException e) { + Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); + } + } + + private static void assertInvalidTag(Service app, SliderFileSystem sfs) + throws IOException { + try { + ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); + Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag"); + } catch (IllegalArgumentException e) { + assertEquals( + String.format( + RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_INVALID, + "comp-b", "comp-a", "comp-a", "comp-b"), + e.getMessage()); + } } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 8b13b2495b8..7dfc52daba5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -25,12 +25,10 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Configuration; @@ -415,141 +413,6 @@ public void testUpgrade() throws Exception { client.actionDestroy(service.getName()); } - // Test to verify ANTI_AFFINITY placement policy - // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler - // 2. Create an example service with 3 containers - // 3. Verify no more than 1 container comes up in each of the 3 NMs - // 4. Flex the component to 4 containers - // 5. Verify that the 4th container does not even get allocated since there - // are only 3 NMs - @Test (timeout = 200000) - public void testCreateServiceWithPlacementPolicy() throws Exception { - // We need to enable scheduler placement-constraint at the cluster level to - // let apps use placement policies. - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, - YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); - setConf(conf); - setupInternal(3); - ServiceClient client = createClient(getConf()); - Service exampleApp = new Service(); - exampleApp.setName("example-app"); - exampleApp.setVersion("v1"); - Component comp = createComponent("compa", 3L, "sleep 1000"); - PlacementPolicy pp = new PlacementPolicy(); - PlacementConstraint pc = new PlacementConstraint(); - pc.setName("CA1"); - pc.setTargetTags(Collections.singletonList("compa")); - pc.setScope(PlacementScope.NODE); - pc.setType(PlacementType.ANTI_AFFINITY); - pp.setConstraints(Collections.singletonList(pc)); - comp.setPlacementPolicy(pp); - exampleApp.addComponent(comp); - client.actionCreate(exampleApp); - waitForServiceToBeStable(client, exampleApp); - - // Check service is stable and all 3 containers are running - Service service = client.getStatus(exampleApp.getName()); - Component component = service.getComponent("compa"); - Assert.assertEquals("Service state should be STABLE", ServiceState.STABLE, - service.getState()); - Assert.assertEquals("3 containers are expected to be running", 3, - component.getContainers().size()); - // Prepare a map of non-AM containers for later lookup - Set nonAMContainerIdSet = new HashSet<>(); - for (Container cont : component.getContainers()) { - nonAMContainerIdSet.add(cont.getId()); - } - - // Verify that no more than 1 non-AM container came up on each of the 3 NMs - Set hosts = new HashSet<>(); - ApplicationReport report = client.getYarnClient() - .getApplicationReport(ApplicationId.fromString(exampleApp.getId())); - GetContainersRequest req = GetContainersRequest - .newInstance(report.getCurrentApplicationAttemptId()); - ResourceManager rm = getYarnCluster().getResourceManager(); - for (ContainerReport contReport : rm.getClientRMService().getContainers(req) - .getContainerList()) { - if (!nonAMContainerIdSet - .contains(contReport.getContainerId().toString())) { - continue; - } - if (hosts.contains(contReport.getNodeHttpAddress())) { - Assert.fail("Container " + contReport.getContainerId() - + " came up in the same host as another container."); - } else { - hosts.add(contReport.getNodeHttpAddress()); - } - } - - // Flex compa up to 5, which is more containers than the no of NMs - Map compCounts = new HashMap<>(); - compCounts.put("compa", 5L); - exampleApp.getComponent("compa").setNumberOfContainers(5L); - client.flexByRestService(exampleApp.getName(), compCounts); - try { - // 10 secs is enough for the container to be started. The down side of - // this test is that it has to wait that long. Setting a higher wait time - // will add to the total time taken by tests to run. - waitForServiceToBeStable(client, exampleApp, 10000); - Assert.fail("Service should not be in a stable state. It should throw " - + "a timeout exception."); - } catch (Exception e) { - // Check that service state is not STABLE and only 3 containers are - // running and the fourth one should not get allocated. - service = client.getStatus(exampleApp.getName()); - component = service.getComponent("compa"); - Assert.assertNotEquals("Service state should not be STABLE", - ServiceState.STABLE, service.getState()); - Assert.assertEquals("Component state should be FLEXING", - ComponentState.FLEXING, component.getState()); - Assert.assertEquals("3 containers are expected to be running", 3, - component.getContainers().size()); - } - - // Flex compa down to 4 now, which is still more containers than the no of - // NMs. This tests the usecase that flex down does not kill any of the - // currently running containers since the required number of containers are - // still higher than the currently running number of containers. However, - // component state will still be FLEXING and service state not STABLE. - compCounts = new HashMap<>(); - compCounts.put("compa", 4L); - exampleApp.getComponent("compa").setNumberOfContainers(4L); - client.flexByRestService(exampleApp.getName(), compCounts); - try { - // 10 secs is enough for the container to be started. The down side of - // this test is that it has to wait that long. Setting a higher wait time - // will add to the total time taken by tests to run. - waitForServiceToBeStable(client, exampleApp, 10000); - Assert.fail("Service should not be in a stable state. It should throw " - + "a timeout exception."); - } catch (Exception e) { - // Check that service state is not STABLE and only 3 containers are - // running and the fourth one should not get allocated. - service = client.getStatus(exampleApp.getName()); - component = service.getComponent("compa"); - Assert.assertNotEquals("Service state should not be STABLE", - ServiceState.STABLE, service.getState()); - Assert.assertEquals("Component state should be FLEXING", - ComponentState.FLEXING, component.getState()); - Assert.assertEquals("3 containers are expected to be running", 3, - component.getContainers().size()); - } - - // Finally flex compa down to 3, which is exactly the number of containers - // currently running. This will bring the component and service states to - // STABLE. - compCounts = new HashMap<>(); - compCounts.put("compa", 3L); - exampleApp.getComponent("compa").setNumberOfContainers(3L); - client.flexByRestService(exampleApp.getName(), compCounts); - waitForServiceToBeStable(client, exampleApp); - - LOG.info("Stop/destroy service {}", exampleApp); - client.actionStop(exampleApp.getName(), true); - client.actionDestroy(exampleApp.getName()); - } - @Test(timeout = 200000) public void testAMSigtermDoesNotKillApplication() throws Exception { runAMSignalTest(SignalContainerCommand.GRACEFUL_SHUTDOWN);