diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 0fcca165099..9e6c42942a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -519,6 +519,9 @@ private void createAllComponents() { for (org.apache.hadoop.yarn.service.api.records.Component compSpec : sortedComponents) { Component component = new Component(compSpec, allocateId, context); + if (component.isHealthThresholdMonitorEnabled()) { + addService(component.getHealthThresholdMonitor()); + } componentsById.put(allocateId, component); componentsByName.put(component.getName(), component); allocateId++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index a2127c80316..d8ebacbb713 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; +import org.apache.hadoop.yarn.service.monitor.ComponentHealthThresholdMonitor; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.monitor.probe.Probe; import org.apache.hadoop.yarn.service.provider.ProviderUtils; @@ -69,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -78,7 +80,7 @@ import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.component.ComponentState.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; -import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*; public class Component implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger(Component.class); @@ -104,6 +106,8 @@ // The number of containers failed since last reset. This excludes preempted, // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + private boolean healthThresholdMonitorEnabled = false; + private ComponentHealthThresholdMonitor healthThresholdMonitor; private StateMachine stateMachine; @@ -179,6 +183,7 @@ public Component( maxContainerFailurePerComp = componentSpec.getConfiguration() .getPropertyInt(CONTAINER_FAILURE_THRESHOLD, 10); createNumCompInstances(component.getNumberOfContainers()); + checkAndCreateHealthThresholdMonitor(); } private void createNumCompInstances(long count) { @@ -196,6 +201,67 @@ private void createOneCompInstance() { pendingInstances.add(instance); } + private void checkAndCreateHealthThresholdMonitor() { + // Determine health threshold percent + int healthThresholdPercent = componentSpec.getConfiguration() + .getPropertyInt(CONTAINER_HEALTH_THRESHOLD_PERCENT, + DEFAULT_CONTAINER_HEALTH_THRESHOLD_PERCENT); + // Validations + if (healthThresholdPercent == CONTAINER_HEALTH_THRESHOLD_PERCENT_DISABLED) { + LOG.info("No health threshold monitor enabled for component {}", + componentSpec.getName()); + return; + } + // If threshold is set to outside acceptable range then don't enable monitor + if (healthThresholdPercent <= 0 || healthThresholdPercent > 100) { + LOG.error( + "Invalid health threshold percent {}% for component {}. Monitor not " + + "enabled.", + healthThresholdPercent, componentSpec.getName()); + return; + } + // Determine the threshold properties + long window = componentSpec.getConfiguration().getPropertyLong( + CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC, + DEFAULT_CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC); + long initDelay = componentSpec.getConfiguration().getPropertyLong( + CONTAINER_HEALTH_THRESHOLD_INIT_DELAY_SEC, + DEFAULT_CONTAINER_HEALTH_THRESHOLD_INIT_DELAY_SEC); + long pollFrequency = componentSpec.getConfiguration().getPropertyLong( + CONTAINER_HEALTH_THRESHOLD_POLL_FREQUENCY_SEC, + DEFAULT_CONTAINER_HEALTH_THRESHOLD_POLL_FREQUENCY_SEC); + // Validations + if (window <= 0) { + LOG.error( + "Invalid health monitor window {} secs for component {}. Monitor not " + + "enabled.", + window, componentSpec.getName()); + return; + } + if (initDelay < 0) { + LOG.error("Invalid health monitor init delay {} secs for component {}. " + + "Monitor not enabled.", initDelay, componentSpec.getName()); + return; + } + if (pollFrequency <= 0) { + LOG.error( + "Invalid health monitor poll frequency {} secs for component {}. " + + "Monitor not enabled.", + pollFrequency, componentSpec.getName()); + return; + } + LOG.info( + "Scheduling the health threshold monitor for component {} with percent " + + "= {}%, window = {} secs, poll freq = {} secs, init-delay = {} " + + "secs", + componentSpec.getName(), healthThresholdPercent, window, pollFrequency, + initDelay); + healthThresholdMonitor = new ComponentHealthThresholdMonitor(this, + healthThresholdPercent, window, initDelay, pollFrequency, + TimeUnit.SECONDS); + setHealthThresholdMonitorEnabled(true); + } + private static class FlexComponentTransition implements MultipleArcTransition { // For flex up, go to FLEXING state @@ -758,4 +824,22 @@ public ServiceContext getContext() { public List getPendingInstances() { return pendingInstances; } + + public boolean isHealthThresholdMonitorEnabled() { + return healthThresholdMonitorEnabled; + } + + public void setHealthThresholdMonitorEnabled( + boolean healthThresholdMonitorEnabled) { + this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled; + } + + public ComponentHealthThresholdMonitor getHealthThresholdMonitor() { + return healthThresholdMonitor; + } + + public void setHealthThresholdMonitor( + ComponentHealthThresholdMonitor healthThresholdMonitor) { + this.healthThresholdMonitor = healthThresholdMonitor; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 0e3e11bc72e..f208a389294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -229,8 +229,11 @@ public void transition(ComponentInstance compInstance, } compInstance.component.decRunningContainers(); boolean shouldExit = false; - // check if it exceeds the failure threshold - if (comp.currentContainerFailure.get() > comp.maxContainerFailurePerComp) { + // Check if it exceeds the failure threshold, but only if health threshold + // monitor is not enabled + if (!comp.isHealthThresholdMonitorEnabled() + && comp.currentContainerFailure + .get() > comp.maxContainerFailurePerComp) { String exitDiag = MessageFormat.format( "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... " + System.lineSeparator(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java index b9a759438e0..ee9274bc684 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -101,6 +101,55 @@ public static final String DEPENDENCY_TARBALL_PATH = YARN_SERVICE_PREFIX + "framework.path"; + public static final String YARN_SERVICE_CONTAINER_HEALTH_THRESHOLD_PREFIX = + YARN_SERVICE_PREFIX + "container-health-threshold."; + + /** + * The container health threshold when explicitly set for a specific component + * or globally for all components, will schedule a health check monitor to + * periodically check for the percentage of healthy containers. It runs the + * check at a specified/default poll frequency. It allows a component to be + * below the health threshold for a specified/default window after which it + * considers the service to be unhealthy and triggers a service stop. + */ + public static final String CONTAINER_HEALTH_THRESHOLD_PERCENT = + YARN_SERVICE_CONTAINER_HEALTH_THRESHOLD_PREFIX + "percent"; + /** + * Health check monitor poll frequency. It is an advanced setting and does not + * need to be set unless the service owner understands the implication and + * does not want the default. + */ + public static final String CONTAINER_HEALTH_THRESHOLD_POLL_FREQUENCY_SEC = + YARN_SERVICE_CONTAINER_HEALTH_THRESHOLD_PREFIX + "poll-frequency-secs"; + /** + * The amount of time the health check monitor allows a specific component to + * be below the health threshold after which it considers the service to be + * unhealthy. + */ + public static final String CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC = + YARN_SERVICE_CONTAINER_HEALTH_THRESHOLD_PREFIX + "window-secs"; + /** + * The amount of initial time the health check monitor waits before the first + * check kicks in. It gives a lead time for the service containers to come up + * for the first time. + */ + public static final String CONTAINER_HEALTH_THRESHOLD_INIT_DELAY_SEC = + YARN_SERVICE_CONTAINER_HEALTH_THRESHOLD_PREFIX + "init-delay-secs"; + /** + * By default the health threshold percent does not come into play until it is + * explicitly set in resource config for a specific component or globally for + * all components. -1 signifies disabled. + */ + public static final int CONTAINER_HEALTH_THRESHOLD_PERCENT_DISABLED = -1; + + public static final int DEFAULT_CONTAINER_HEALTH_THRESHOLD_PERCENT = + CONTAINER_HEALTH_THRESHOLD_PERCENT_DISABLED; + public static final long DEFAULT_CONTAINER_HEALTH_THRESHOLD_POLL_FREQUENCY_SEC = 10; + public static final long DEFAULT_CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC = 600; + // The default for initial delay is same as default health window + public static final long DEFAULT_CONTAINER_HEALTH_THRESHOLD_INIT_DELAY_SEC = + DEFAULT_CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC; + /** * Get long value for the property. First get from the userConf, if not * present, get from systemConf. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ComponentHealthThresholdMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ComponentHealthThresholdMonitor.java new file mode 100644 index 00000000000..1dc176c3d13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ComponentHealthThresholdMonitor.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.monitor; + +import java.text.MessageFormat; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.service.component.Component; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors the health of containers of a specific component at a regular + * interval. It takes necessary actions when the health of a component drops + * below a desired threshold. + */ +public class ComponentHealthThresholdMonitor extends CompositeService { + private static final Logger LOG = LoggerFactory + .getLogger(ComponentHealthThresholdMonitor.class); + private ScheduledExecutorService executorService; + private static final String NAME_PREFIX = "ComponentHealthThresholdMonitor-"; + private final Component component; + private final int healthThresholdPercent; + private final long healthThresholdWindowSecs; + private final long initialDelay; + private final long interval; + private final TimeUnit timeUnit; + private final long healthThresholdWindowNanos; + private long firstOccurrenceTimestamp = 0; + // Sufficient logging happens when component health is below threshold. + // However, there has to be some logging when it is above threshold, otherwise + // service owners have no idea how the health is fluctuating. So let's log + // whenever there is a change in component health, thereby preventing + // excessive logging on every poll. + private float prevRunningContainerFraction = 0; + + public ComponentHealthThresholdMonitor(Component component, + int healthThresholdPercent, long healthThresholdWindowSecs, + long initialDelay, long interval, TimeUnit timeUnit) { + super(NAME_PREFIX + component.getName()); + this.component = component; + this.healthThresholdPercent = healthThresholdPercent; + this.healthThresholdWindowSecs = healthThresholdWindowSecs; + this.initialDelay = initialDelay; + this.interval = interval; + this.timeUnit = timeUnit; + this.healthThresholdWindowNanos = TimeUnit.NANOSECONDS + .convert(healthThresholdWindowSecs, TimeUnit.SECONDS); + addService(this); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + executorService = Executors.newScheduledThreadPool(1); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + executorService.scheduleAtFixedRate(new ContainerHealthChecker(), + initialDelay, interval, timeUnit); + } + + @Override + public void serviceStop() throws Exception { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + private class ContainerHealthChecker implements Runnable { + @Override + public void run() { + LOG.debug( + "ComponentHealthThresholdMonitor.ContainerHealthChecker run method"); + // Perform container health checks against desired threshold + long desiredContainerCount = component.getNumDesiredInstances(); + // If desired container count for this component is 0 then nothing to do + if (desiredContainerCount == 0) { + return; + } + long runningContainerCount = component.getNumRunningInstances(); + float thresholdFraction = (float) healthThresholdPercent / 100; + // No possibility of div by 0 since desiredContainerCount won't be 0 here + float runningContainerFraction = (float) runningContainerCount + / desiredContainerCount; + boolean healthChanged = false; + if (runningContainerFraction != prevRunningContainerFraction) { + prevRunningContainerFraction = runningContainerFraction; + healthChanged = true; + } + String runningContainerPercentStr = String.format("%.2f", + runningContainerFraction * 100); + // Check if the current running container percent is less than the + // threshold percent + if (runningContainerFraction < thresholdFraction) { + // Check if it is the first occurrence and if yes set the timestamp + long currentTimestamp = System.nanoTime(); + if (firstOccurrenceTimestamp == 0) { + firstOccurrenceTimestamp = currentTimestamp; + LOG.info( + "[Component {}] is going below health threshold for the first " + + "time at ts = {}", + component.getName(), firstOccurrenceTimestamp); + } + long elapsedTime = currentTimestamp - firstOccurrenceTimestamp; + long elapsedTimeSecs = TimeUnit.SECONDS.convert(elapsedTime, + TimeUnit.NANOSECONDS); + LOG.warn( + "[Component {}] Current health {}% is below health threshold of " + + "{}% for {} secs (threshold window = {} secs)", + component.getName(), runningContainerPercentStr, + healthThresholdPercent, elapsedTimeSecs, healthThresholdWindowSecs); + if (elapsedTime > healthThresholdWindowNanos) { + LOG.error( + "[Component {}] Current health {}% has been below health " + + "threshold of {}% for {} secs (threshold window = {} secs)", + component.getName(), runningContainerPercentStr, + healthThresholdPercent, elapsedTimeSecs, + healthThresholdWindowSecs); + // Trigger service stop + String exitDiag = MessageFormat.format( + "Service is being killed because container health for component " + + "%s was %s%% (health threshold = %d%%) for %d secs " + + "(threshold window = %d secs)", + component.getName(), runningContainerPercentStr, + healthThresholdPercent, elapsedTimeSecs, + healthThresholdWindowSecs); + // Append to global diagnostics that will be reported to RM. + component.getScheduler().getDiagnostics().append(exitDiag); + LOG.warn(exitDiag); + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // In case there's a client polling the component state, it can be + // notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.error("Interrupted on sleep while exiting.", e); + } + ExitUtil.terminate(-1); + } + } else { + String logMsg = "[Component {}] Health threshold = {}%, Current health " + + "= {}% (Current Running count = {}, Desired count = {})"; + if (healthChanged) { + LOG.info(logMsg, component.getName(), healthThresholdPercent, + runningContainerPercentStr, runningContainerCount, + desiredContainerCount); + } else { + LOG.debug(logMsg, component.getName(), healthThresholdPercent, + runningContainerPercentStr, runningContainerCount, + desiredContainerCount); + } + // The container health might have recovered above threshold after being + // below for less than the threshold window amount of time. So we need + // to reset firstOccurrenceTimestamp to 0. + if (firstOccurrenceTimestamp != 0) { + LOG.info( + "[Component {}] Resetting first occurence to 0, since it " + + "recovered above health threshold of {}%", + component.getName(), healthThresholdPercent); + firstOccurrenceTimestamp = 0; + } + } + } + } +}