diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 0b0ba7945d6..0daba47c50b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -129,6 +129,8 @@ private Map failedInstances = new ConcurrentHashMap<>(); private boolean healthThresholdMonitorEnabled = false; + private boolean parallelismEnabled = false; + private int parallelism; private UpgradeStatus upgradeStatus = new UpgradeStatus(); private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus(); @@ -248,6 +250,7 @@ public Component( createNumCompInstances(component.getNumberOfContainers()); setDesiredContainers(component.getNumberOfContainers().intValue()); checkAndScheduleHealthThresholdMonitor(); + checkParallelismEnabled(); } private void createNumCompInstances(long count) { @@ -354,7 +357,8 @@ public ComponentState transition(Component component, // This happens on init LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event .getDesired() + " instances."); - component.requestContainers(component.pendingInstances.size()); + component.requestContainers(component.getRequestCountWithParallelism( + component.pendingInstances.size())); return checkIfStable(component); } long before = component.getComponentSpec().getNumberOfContainers(); @@ -364,7 +368,8 @@ public ComponentState transition(Component component, // Scale up LOG.info("[FLEX UP COMPONENT " + component.getName() + "]: scaling up from " + before + " to " + event.getDesired()); - component.requestContainers(delta); + component.requestContainers(component.getRequestCountWithParallelism( + delta)); component.createNumCompInstances(delta); component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); @@ -766,6 +771,13 @@ public void requestContainers(long count) { resource.setResourceInformation(resourceName, ri); } } + if (parallelismEnabled && parallelism <= getNumLiveInstances()) { + LOG.info( + "[COMPONENT {}]: Running instances {}, parallelism {}, release surplus container {}", + getName(), container.getId()); + releaseContainer(container); + return; + } if (!scheduler.hasAtLeastOnePlacementConstraint()) { for (int i = 0; i < count; i++) { @@ -1302,4 +1314,37 @@ public String getHostnameSuffix() { return ServiceApiUtil.getHostnameSuffix(context.service.getName(), scheduler.getConfig()); } + + private void checkParallelismEnabled() { + // No effect for longlived. + // Parallelism should be grater than 0 and + // less than the number of containers + parallelism = YarnServiceConf.getInt(CONTAINER_PARALLELISM, + DEFAULT_CONTAINER_PARALLELISM, componentSpec.getConfiguration(), + scheduler.getConfig()); + if (!getRestartPolicyHandler().isLongLived() && parallelism > 0 && + parallelism < componentSpec.getNumberOfContainers()) { + LOG.info("[COMPONENT {}] Enabled parallelism {}", getName(), + parallelism); + parallelismEnabled = true; + } + } + + public long getRequestCountWithParallelism(long desiredCount) { + if (!parallelismEnabled) { + return desiredCount; + } + long adjustedCount = parallelism - getNumLiveInstances(); + LOG.info("[COMPONENT {}] Adjust requesting containers with parallelism. " + + "desired count: {}, parallelism: {}, live instances: {}, " + + "adjusted count: {}", getName(), desiredCount, + parallelism, getNumLiveInstances(), adjustedCount); + return desiredCount < adjustedCount ? desiredCount : adjustedCount; + } + + private long getNumLiveInstances() { + return scheduler.getLiveInstances().values().stream().filter( + instance -> instance.getCompName().equals(getName()) + ).count(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 2f75dc46832..334a2c1a52c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -402,6 +402,15 @@ static void handleComponentInstanceRelaunch(ComponentInstance compInstance, LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); + long pendingInstances = comp.getPendingInstances().size(); + long requestCount = comp.getRequestCountWithParallelism(pendingInstances); + if (requestCount > 0) { + LOG.info(comp.getName() + " has pending instances " + + comp.getPendingInstances().size() + " request containers " + + requestCount); + comp.requestContainers(requestCount); + return; + } comp.getScheduler().terminateServiceIfNeeded(comp); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java index 86c4de2ef89..311f54fbc8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -188,6 +188,14 @@ public static final long DEFAULT_CONTAINER_HEALTH_THRESHOLD_INIT_DELAY_SEC = DEFAULT_CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC; + /** + * The number of containers running at any instant. + * When the policy of restart component is ALWAYS, it will be ignored. + */ + public static final String CONTAINER_PARALLELISM = + YARN_SERVICE_PREFIX + "container.parallelism"; + public static final int DEFAULT_CONTAINER_PARALLELISM = -1; + /** * Get long value for the property. First get from the userConf, if not * present, get from systemConf.