diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index 869d7f3659d..e6a38dc10c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -237,12 +237,11 @@ private static ServiceState checkIfStable(Service service) { * ServiceMaster.checkAndUpdateServiceState here to make it easy to fix * this in future. */ - public void checkAndUpdateServiceState(boolean isIncrement) { + public void checkAndUpdateServiceState() { writeLock.lock(); try { if (!getState().equals(State.UPGRADING)) { - ServiceMaster.checkAndUpdateServiceState(this.scheduler, - isIncrement); + ServiceMaster.checkAndUpdateServiceState(this.scheduler); } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java index 0383a65bf2e..12da54e2580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java @@ -264,30 +264,25 @@ protected void serviceStop() throws Exception { // This method should be called whenever there is an increment or decrement // of a READY state component of a service public static synchronized void checkAndUpdateServiceState( - ServiceScheduler scheduler, boolean isIncrement) { + ServiceScheduler scheduler) { ServiceState curState = scheduler.getApp().getState(); - if (!isIncrement) { - // set it to STARTED every time a component moves out of STABLE state - scheduler.getApp().setState(ServiceState.STARTED); - } else { - // otherwise check the state of all components - boolean isStable = true; - for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler - .getApp().getComponents()) { - if (comp.getState() != - org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) { - isStable = false; - break; - } + // Check the state of all components + boolean isStable = true; + for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler + .getApp().getComponents()) { + if (comp + .getState() != org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) { + isStable = false; + break; } - if (isStable) { - scheduler.getApp().setState(ServiceState.STABLE); - } else { - // mark new state as started only if current state is stable, otherwise - // leave it as is - if (curState == ServiceState.STABLE) { - scheduler.getApp().setState(ServiceState.STARTED); - } + } + if (isStable) { + scheduler.getApp().setState(ServiceState.STABLE); + } else { + // mark new state as started only if current state is stable, otherwise + // leave it as is + if (curState == ServiceState.STABLE) { + scheduler.getApp().setState(ServiceState.STARTED); } } if (curState != scheduler.getApp().getState()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index e115841cb47..631b4a8ce70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -67,13 +67,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -101,8 +102,8 @@ private Map compInstances = new ConcurrentHashMap<>(); // component instances to be assigned with a container - private List pendingInstances = - Collections.synchronizedList(new LinkedList<>()); + private Deque pendingInstances = + new ConcurrentLinkedDeque<>(); private ContainerFailureTracker failureTracker; private Probe probe; private final ReentrantReadWriteLock.ReadLock readLock; @@ -323,31 +324,49 @@ public ComponentState transition(Component component, org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); component.getScheduler().getApp().setState(ServiceState.STARTED); return FLEXING; - } else if (delta < 0){ + } else if (delta < 0) { delta = 0 - delta; // scale down LOG.info("[FLEX DOWN COMPONENT " + component.getName() + "]: scaling down from " + before + " to " + event.getDesired()); - List list = - new ArrayList<>(component.getAllComponentInstances()); - - // sort in Most recent -> oldest order, destroy most recent ones. - list.sort(Collections.reverseOrder()); - for (int i = 0; i < delta; i++) { - ComponentInstance instance = list.get(i); - // remove the instance - component.compInstances.remove(instance.getCompInstanceName()); - component.pendingInstances.remove(instance); - // decrement id counter - component.instanceIdCounter.decrementAndGet(); - instance.destroy(); + + // Check if there are any pending instances first and remove them if any + if (component.pendingInstances.size() > 0) { + synchronized (component.pendingInstances) { + int currentPendingInstanceCount = component.pendingInstances.size(); + if (delta >= currentPendingInstanceCount) { + component.pendingInstances.clear(); + delta -= currentPendingInstanceCount; + } else { + for (int i = 0; i < delta; i++) { + component.pendingInstances.removeLast(); + } + delta = 0; + } + } + } + // If there are more to be removed then kill existing instances + if (delta > 0) { + // sort in Most recent -> oldest order, destroy most recent ones. + List list = new ArrayList<>( + component.getAllComponentInstances()); + list.sort(Collections.reverseOrder()); + for (int i = 0; i < delta; i++) { + ComponentInstance instance = list.get(i); + // remove the instance + component.compInstances.remove(instance.getCompInstanceName()); + component.pendingInstances.remove(instance); + // decrement id counter + component.instanceIdCounter.decrementAndGet(); + instance.destroy(); + } } checkAndUpdateComponentState(component, false); - return STABLE; + return checkIfStable(component); } else { LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " + event.getDesired() + " instances, ignoring"); - return STABLE; + return checkIfStable(component); } } } @@ -440,7 +459,7 @@ public static synchronized void checkAndUpdateComponentState( component.componentSpec.getState()); } // component state change will trigger re-check of service state - component.context.getServiceManager().checkAndUpdateServiceState(true); + component.context.getServiceManager().checkAndUpdateServiceState(); } } else { // container moving out of READY state could be because of FLEX down so @@ -455,7 +474,18 @@ public static synchronized void checkAndUpdateComponentState( component.componentSpec.getState()); } // component state change will trigger re-check of service state - component.context.getServiceManager().checkAndUpdateServiceState(false); + component.context.getServiceManager().checkAndUpdateServiceState(); + } else if (component.componentMetrics.containersReady + .value() >= component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + if (curState != component.componentSpec.getState()) { + LOG.info("[COMPONENT {}] state changed from {} -> {}", + component.componentSpec.getName(), curState, + component.componentSpec.getState()); + } + // component state change will trigger re-check of service state + component.context.getServiceManager().checkAndUpdateServiceState(); } } // when the service is stable then the state of component needs to @@ -544,7 +574,7 @@ private void assignContainerToCompInstance(Container container) { releaseContainer(container); return; } - ComponentInstance instance = pendingInstances.remove(0); + ComponentInstance instance = pendingInstances.remove(); LOG.info( "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}", getName(), container.getId(), pendingInstances.size()); @@ -937,7 +967,7 @@ public ServiceContext getContext() { } // Only for testing - public List getPendingInstances() { + public Deque getPendingInstances() { return pendingInstances; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 5b608e3e589..ae209b929ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -484,8 +484,37 @@ public void testCreateServiceWithPlacementPolicy() throws Exception { } } - // Flex compa up to 4, which is more containers than the no of NMs + // Flex compa up to 5, which is more containers than the no of NMs Map compCounts = new HashMap<>(); + compCounts.put("compa", 5L); + exampleApp.getComponent("compa").setNumberOfContainers(5L); + client.flexByRestService(exampleApp.getName(), compCounts); + try { + // 10 secs is enough for the container to be started. The down side of + // this test is that it has to wait that long. Setting a higher wait time + // will add to the total time taken by tests to run. + waitForServiceToBeStable(client, exampleApp, 10000); + Assert.fail("Service should not be in a stable state. It should throw " + + "a timeout exception."); + } catch (Exception e) { + // Check that service state is not STABLE and only 3 containers are + // running and the fourth one should not get allocated. + service = client.getStatus(exampleApp.getName()); + component = service.getComponent("compa"); + Assert.assertNotEquals("Service state should not be STABLE", + ServiceState.STABLE, service.getState()); + Assert.assertEquals("Component state should be FLEXING", + ComponentState.FLEXING, component.getState()); + Assert.assertEquals("3 containers are expected to be running", 3, + component.getContainers().size()); + } + + // Flex compa down to 4 now, which is still more containers than the no of + // NMs. This tests the usecase that flex down does not kill any of the + // currently running containers since the required number of containers are + // still higher than the currently running number of containers. However, + // component state will still be FLEXING and service state not STABLE. + compCounts = new HashMap<>(); compCounts.put("compa", 4L); exampleApp.getComponent("compa").setNumberOfContainers(4L); client.flexByRestService(exampleApp.getName(), compCounts); @@ -509,6 +538,15 @@ public void testCreateServiceWithPlacementPolicy() throws Exception { component.getContainers().size()); } + // Finally flex compa down to 3, which is exactly the number of containers + // currently running. This will bring the component and service states to + // STABLE. + compCounts = new HashMap<>(); + compCounts.put("compa", 3L); + exampleApp.getComponent("compa").setNumberOfContainers(3L); + client.flexByRestService(exampleApp.getName(), compCounts); + waitForServiceToBeStable(client, exampleApp); + LOG.info("Stop/destroy service {}", exampleApp); client.actionStop(exampleApp.getName(), true); client.actionDestroy(exampleApp.getName());