diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index f8ed4d5..05aad32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -297,18 +297,21 @@ definitions: ReadinessCheck: description: A custom command or a pluggable helper container to determine the readiness of a container of a component. Readiness for every application is different. Hence the need for a simple interface, with scope to support advanced usecases. required: - - uri + - type properties: type: type: string description: E.g. HTTP (YARN will perform a simple REST call at a regular interval and expect a 204 No content). enum: - HTTP - uri: - type: string - description: Fully qualified REST uri endpoint. + - PORT + props: + type: object + description: A blob of key value pairs that will be used to configure the check. + additionalProperties: + type: string artifact: - description: Artifact of the pluggable readiness check helper container (optional). If specified, this helper container typically hosts the http uri and encapsulates the complex scripts required to perform actual container readiness check. At the end it is expected to respond a 204 No content just like the simplified use case. This pluggable framework benefits application owners who can run applications without any packaging modifications. Note, artifacts of type docker only is supported for now. + description: Artifact of the pluggable readiness check helper container (optional). If specified, this helper container typically hosts the http uri and encapsulates the complex scripts required to perform actual container readiness check. At the end it is expected to respond a 204 No content just like the simplified use case. This pluggable framework benefits application owners who can run applications without any packaging modifications. Note, artifacts of type docker only is supported for now. NOT IMPLEMENTED YET $ref: '#/definitions/Artifact' Configuration: description: Set of configuration properties that can be injected into the application components via envs, files and custom pluggable helper docker containers. Files of several standard formats like xml, properties, json, yaml and templates will be supported. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/InternalKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/InternalKeys.java index f690f5a..0e3b535 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/InternalKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/InternalKeys.java @@ -196,4 +196,15 @@ * default value: {@value} */ int DEFAULT_ESCALATION_CHECK_INTERVAL = 30; + + + /** + * interval between readiness checks: {@value} + */ + String MONITOR_INTERVAL = "monitor.interval.seconds"; + + /** + * default value: {@value} + */ + int DEFAULT_MONITOR_INTERVAL = 30; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/StateValues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/StateValues.java index 03751e1..eea0fce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/StateValues.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/StateValues.java @@ -19,13 +19,13 @@ package org.apache.slider.api; /** - * Enumeration of state values + * Enumeration of state values. */ public class StateValues { /** * Specification is incomplete & cannot - * be used: {@value} + * be used: {@value}. */ public static final int STATE_INCOMPLETE = 0; @@ -42,12 +42,20 @@ */ public static final int STATE_LIVE = 3; /** - * Stopped + * Not ready. */ - public static final int STATE_STOPPED = 4; + public static final int STATE_NOT_READY = 4; /** - * destroyed + * Ready. */ - public static final int STATE_DESTROYED = 5; + public static final int STATE_READY = 5; + /** + * Stopped. + */ + public static final int STATE_STOPPED = 99; + /** + * Destroyed. + */ + public static final int STATE_DESTROYED = 100; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/resource/ReadinessCheck.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/resource/ReadinessCheck.java index 00bf29c..b3c85bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/resource/ReadinessCheck.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/api/resource/ReadinessCheck.java @@ -21,6 +21,8 @@ import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import com.fasterxml.jackson.annotation.JsonProperty; @@ -39,7 +41,8 @@ private static final long serialVersionUID = -3836839816887186801L; public enum TypeEnum { - HTTP("HTTP"); + HTTP("HTTP"), + PORT("PORT"); private String value; @@ -55,7 +58,7 @@ public String toString() { } private TypeEnum type = null; - private String uri = null; + private Map props = new HashMap(); private Artifact artifact = null; /** @@ -77,22 +80,27 @@ public void setType(TypeEnum type) { this.type = type; } - /** - * Fully qualified REST uri endpoint. - **/ - public ReadinessCheck uri(String uri) { - this.uri = uri; + public ReadinessCheck props(Map props) { + this.props = props; + return this; + } + + public ReadinessCheck putPropsItem(String key, String propsItem) { + this.props.put(key, propsItem); return this; } - @ApiModelProperty(example = "null", required = true, value = "Fully qualified REST uri endpoint.") - @JsonProperty("uri") - public String getUri() { - return uri; + /** + * A blob of key value pairs that will be used to configure the check. + * @return props + **/ + @ApiModelProperty(example = "null", value = "A blob of key value pairs that will be used to configure the check.") + public Map getProps() { + return props; } - public void setUri(String uri) { - this.uri = uri; + public void setProps(Map props) { + this.props = props; } /** @@ -128,23 +136,24 @@ public boolean equals(java.lang.Object o) { return false; } ReadinessCheck readinessCheck = (ReadinessCheck) o; - return Objects.equals(this.type, readinessCheck.type) - && Objects.equals(this.uri, readinessCheck.uri) - && Objects.equals(this.artifact, readinessCheck.artifact); + return Objects.equals(this.type, readinessCheck.type) && + Objects.equals(this.props, readinessCheck.props) && + Objects.equals(this.artifact, readinessCheck.artifact); } @Override public int hashCode() { - return Objects.hash(type, uri, artifact); + return Objects.hash(type, props, artifact); } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("class ReadinessCheck {\n"); sb.append(" type: ").append(toIndentedString(type)).append("\n"); - sb.append(" uri: ").append(toIndentedString(uri)).append("\n"); + sb.append(" props: ").append(toIndentedString(props)).append("\n"); sb.append(" artifact: ").append(toIndentedString(artifact)).append("\n"); sb.append("}"); return sb.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java index 182e956..6fd85bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java @@ -21,6 +21,8 @@ import org.apache.slider.api.ResourceKeys; import org.apache.slider.api.resource.Component; import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.slider.server.servicemonitor.MonitorUtils; +import org.apache.slider.server.servicemonitor.Probe; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -34,7 +36,6 @@ */ public final class ProviderRole { public final String name; - public final String group; public final int id; public int placementPolicy; public int nodeFailureThreshold; @@ -43,6 +44,8 @@ public final Component component; public AtomicLong componentIdCounter = null; public Queue failedInstances = new ConcurrentLinkedQueue<>(); + public Probe probe; + public ProviderRole(String name, int id) { this(name, id, @@ -69,7 +72,6 @@ public ProviderRole(String name, long placementTimeoutSeconds, String labelExpression) { this(name, - name, id, policy, nodeFailureThreshold, @@ -81,7 +83,6 @@ public ProviderRole(String name, /** * Create a provider role with a role group * @param name role/component name - * @param group role/component group * @param id ID. This becomes the YARN priority * @param policy placement policy * @param nodeFailureThreshold threshold for node failures (within a reset interval) @@ -89,15 +90,10 @@ public ProviderRole(String name, * @param placementTimeoutSeconds for lax placement, timeout in seconds before * @param labelExpression label expression for requests; may be null */ - public ProviderRole(String name, String group, int id, int policy, + public ProviderRole(String name, int id, int policy, int nodeFailureThreshold, long placementTimeoutSeconds, String labelExpression, Component component) { this.name = name; - if (group == null) { - this.group = name; - } else { - this.group = group; - } this.id = id; this.placementPolicy = policy; this.nodeFailureThreshold = nodeFailureThreshold; @@ -107,6 +103,7 @@ public ProviderRole(String name, String group, int id, int policy, if(component.getUniqueComponentSupport()) { componentIdCounter = new AtomicLong(0); } + this.probe = MonitorUtils.getProbe(component.getReadinessCheck()); } @@ -132,7 +129,6 @@ public int hashCode() { public String toString() { final StringBuilder sb = new StringBuilder("ProviderRole{"); sb.append("name='").append(name).append('\''); - sb.append(", group=").append(group); sb.append(", id=").append(id); sb.append(", placementPolicy=").append(placementPolicy); sb.append(", nodeFailureThreshold=").append(nodeFailureThreshold); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java index d58ecaa..0da535e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java @@ -254,17 +254,6 @@ public void localizeServiceKeytabs(ContainerLauncher launcher, } } - public static void addEnvForSubstitution(Map env, - Map tokensForSubstitution) { - if (env == null || env.isEmpty() || tokensForSubstitution == null - || tokensForSubstitution.isEmpty()) { - return; - } - for (Map.Entry entry : env.entrySet()) { - tokensForSubstitution.put($(entry.getKey()), entry.getValue()); - } - } - // 1. Create all config files for a component on hdfs for localization // 2. Add the config file to localResource public synchronized void createConfigFileAndAddLocalResource( @@ -284,10 +273,6 @@ public synchronized void createConfigFileAndAddLocalResource( log.info("Component instance conf dir already exists: " + compInstanceDir); } - // add Configuration#env into tokens substitution - addEnvForSubstitution(component.getConfiguration().getEnv(), - tokensForSubstitution); - log.info("Tokens substitution for component: " + roleInstance .getCompInstanceName() + System.lineSeparator() + tokensForSubstitution); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java index aa84940..f1b07f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java @@ -133,7 +133,6 @@ public String toString() { return "RoleLauncher{" + "container=" + container.getId() + ", containerRole='" + role.name + '\'' + - ", containerGroup='" + role.group + '\'' + '}'; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 0c3fcea..5fe5043 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -117,6 +117,7 @@ import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers; import org.apache.slider.server.appmaster.actions.AsyncAction; import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests; +import org.apache.slider.server.appmaster.actions.MonitorComponentInstances; import org.apache.slider.server.appmaster.actions.QueueExecutor; import org.apache.slider.server.appmaster.actions.QueueService; import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; @@ -868,6 +869,7 @@ private int createAndRunCluster(String appName) throws Throwable { scheduleFailureWindowResets(application.getConfiguration()); scheduleEscalation(application.getConfiguration()); + scheduleMonitoring(application.getConfiguration()); try { // schedule YARN Registry registration @@ -1644,9 +1646,22 @@ private void scheduleEscalation( new RenewingAction<>(escalate, seconds, seconds, TimeUnit.SECONDS, 0); actionQueues.renewing("escalation", renew); } - + /** - * Look at where the current node state is -and whether it should be changed + * Schedule monitor action + */ + private void scheduleMonitoring( + org.apache.slider.api.resource.Configuration conf) { + MonitorComponentInstances monitor = new MonitorComponentInstances(); + long seconds = conf.getPropertyLong(InternalKeys.MONITOR_INTERVAL, + InternalKeys.DEFAULT_MONITOR_INTERVAL); + RenewingAction renew = + new RenewingAction<>(monitor, seconds, seconds, TimeUnit.SECONDS, 0); + actionQueues.renewing("monitoring", renew); + } + + /** + * Look at where the current node state is and whether it should be changed. * @param reason reason for operation */ private synchronized void reviewRequestAndReleaseNodes(String reason) { @@ -1711,6 +1726,15 @@ public void escalateOutstandingRequests() { execute(operations); } + public void monitorComponentInstances() { + // TODO use health checks? + // TODO publish timeline events for monitoring changes? + if (appState.monitorComponentInstances()) { + // monitoring change + reviewRequestAndReleaseNodes("monitoring change"); + } + } + /** * Shutdown operation: release all containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/MonitorComponentInstances.java similarity index 56% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/MonitorComponentInstances.java index 5a02f46..f7aa871 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/MonitorComponentInstances.java @@ -6,7 +6,8 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,16 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.slider.server.appmaster.actions; -package org.apache.slider.server.servicemonitor; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; /** - * This exception is raised when the probe loop detects that it has been requested to stop - * + * Execute readiness checks on component instances. */ -public class ProbeInterruptedException extends Exception { +public class MonitorComponentInstances extends AsyncAction { + + public MonitorComponentInstances() { + super("MonitorComponentInstance"); + } - public ProbeInterruptedException() { - super("Probe Interrupted"); + @Override + public void execute(SliderAppMaster appMaster, QueueAccess queueService, + AppState appState) throws Exception { + appMaster.monitorComponentInstances(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 1e1b377..652a502 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -353,7 +353,7 @@ public synchronized void buildInstance(AppStateBindingInfo binding) continue; } log.info("Adding component: " + name); - createComponent(name, name, component, priority++); + createComponent(name, component, priority++); } //then pick up the requirements @@ -433,8 +433,8 @@ private void createConfigFileCache(final FileSystem fileSystem) { }); } - public ProviderRole createComponent(String name, String group, - Component component, int priority) throws BadConfigException { + public ProviderRole createComponent(String name, Component component, + int priority) throws BadConfigException { org.apache.slider.api.resource.Configuration conf = component.getConfiguration(); long placementTimeout = conf.getPropertyLong(PLACEMENT_ESCALATE_DELAY, @@ -446,7 +446,7 @@ public ProviderRole createComponent(String name, String group, String label = conf.getProperty(YARN_LABEL_EXPRESSION, DEF_YARN_LABEL_EXPRESSION); ProviderRole newRole = - new ProviderRole(name, group, priority, (int)placementPolicy, threshold, + new ProviderRole(name, priority, (int)placementPolicy, threshold, placementTimeout, label, component); buildRole(newRole, component); log.info("Created a new role " + newRole); @@ -1535,7 +1535,8 @@ public synchronized AbstractRMOperation updateBlacklist() { allOperations.add(blacklistOperation); } for (RoleStatus roleStatus : getRoleStatusMap().values()) { - if (!roleStatus.isExcludeFromFlexing()) { + if (!roleStatus.isExcludeFromFlexing() && + areDependenciesReady(roleStatus)) { List operations = reviewOneRole(roleStatus); allOperations.addAll(operations); } @@ -1543,6 +1544,47 @@ public synchronized AbstractRMOperation updateBlacklist() { return allOperations; } + @VisibleForTesting + public boolean areDependenciesReady(RoleStatus roleStatus) { + List dependencies = roleStatus.getProviderRole().component + .getDependencies(); + if (SliderUtils.isEmpty(dependencies)) { + return true; + } + for (String dependency : dependencies) { + ProviderRole providerRole = roles.get(dependency); + if (providerRole == null) { + log.error("Couldn't find dependency {} for {} (should never happen)", + dependency, roleStatus.getName()); + continue; + } + RoleStatus other = getRoleStatusMap().get(providerRole.id); + if (other.getRunning() < other.getDesired()) { + log.info("Dependency {} not satisfied for {}, only {} of {} instances" + + " running", dependency, roleStatus.getName(), other.getRunning(), + other.getDesired()); + return false; + } + if (providerRole.probe == null) { + continue; + } + List dependencyInstances = enumLiveNodesInRole( + providerRole.name); + if (dependencyInstances.size() < other.getDesired()) { + log.info("Dependency {} not satisfied for {}, only {} of {} instances" + + " live", dependency, roleStatus.getName(), + dependencyInstances.size(), other.getDesired()); + return false; + } + for (RoleInstance instance : dependencyInstances) { + if (instance.state != STATE_READY) { + return false; + } + } + } + return true; + } + /** * Check the "recent" failure threshold for a role * @param role role to examine @@ -1620,6 +1662,31 @@ public void resetFailureCounts() { return operations; } + public synchronized boolean monitorComponentInstances() { + boolean hasChanged = false; + for (RoleInstance instance : getLiveContainers().values()) { + if (instance.providerRole.probe == null) { + continue; + } + boolean ready = instance.providerRole.probe.ping(instance).isSuccess(); + if (ready) { + if (instance.state != STATE_READY) { + instance.state = STATE_READY; + hasChanged = true; + log.info("State of {} changed to ready", instance.role); + } + } else { + if (instance.state == STATE_READY) { + instance.state = STATE_NOT_READY; + hasChanged = true; + log.info("State of {} changed from ready to not ready", instance + .role); + } + } + } + return hasChanged; + } + /** * Look at the allocation status of one role, and trigger add/release * actions if the number of desired role instances doesn't equal diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index 9842481..3d9a8f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -45,7 +45,6 @@ public final class RoleStatus implements MetricSet { private final String name; - private final String group; /** * Role priority @@ -66,7 +65,6 @@ public RoleStatus(ProviderRole providerRole) { this.providerRole = providerRole; this.name = providerRole.name; - this.group = providerRole.group; this.key = providerRole.id; componentMetrics = SliderMetrics.register(this.name, "Metrics for component " + this.name); @@ -95,10 +93,6 @@ public String getName() { return name; } - public String getGroup() { - return group; - } - public int getKey() { return key; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java index 9c14ca7..9eff165 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java @@ -18,30 +18,50 @@ package org.apache.slider.server.servicemonitor; import org.apache.hadoop.conf.Configuration; +import org.apache.slider.server.appmaster.state.RoleInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.util.Map; public class HttpProbe extends Probe { protected static final Logger log = LoggerFactory.getLogger(HttpProbe.class); - private final URL url; + private static final String HOST_TOKEN = "${THIS_HOST}"; + + private final String urlString; private final int timeout; private final int min, max; - public HttpProbe(URL url, int timeout, int min, int max, Configuration conf) throws IOException { + public HttpProbe(String url, int timeout, int min, int max, Configuration + conf) { super("Http probe of " + url + " [" + min + "-" + max + "]", conf); - this.url = url; + this.urlString = url; this.timeout = timeout; this.min = min; this.max = max; } - public static HttpURLConnection getConnection(URL url, int timeout) throws IOException { + public static HttpProbe create(Map props) + throws IOException { + String urlString = getProperty(props, WEB_PROBE_URL, null); + new URL(urlString); + int timeout = getPropertyInt(props, WEB_PROBE_CONNECT_TIMEOUT, + WEB_PROBE_CONNECT_TIMEOUT_DEFAULT); + int minSuccess = getPropertyInt(props, WEB_PROBE_MIN_SUCCESS, + WEB_PROBE_MIN_SUCCESS_DEFAULT); + int maxSuccess = getPropertyInt(props, WEB_PROBE_MAX_SUCCESS, + WEB_PROBE_MAX_SUCCESS_DEFAULT); + return new HttpProbe(urlString, timeout, minSuccess, maxSuccess, null); + } + + + private static HttpURLConnection getConnection(URL url, int timeout) throws + IOException { HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setInstanceFollowRedirects(true); connection.setConnectTimeout(timeout); @@ -49,13 +69,20 @@ public static HttpURLConnection getConnection(URL url, int timeout) throws IOExc } @Override - public ProbeStatus ping(boolean livePing) { + public ProbeStatus ping(RoleInstance roleInstance) { ProbeStatus status = new ProbeStatus(); + String ip = roleInstance.ip; + if (ip == null) { + status.fail(this, new IOException("IP is not available yet")); + return status; + } + HttpURLConnection connection = null; try { if (log.isDebugEnabled()) { // LOG.debug("Fetching " + url + " with timeout " + timeout); } + URL url = new URL(urlString.replace(HOST_TOKEN, ip)); connection = getConnection(url, this.timeout); int rc = connection.getResponseCode(); if (rc < min || rc > max) { @@ -66,8 +93,8 @@ public ProbeStatus ping(boolean livePing) { } else { status.succeed(this); } - } catch (IOException e) { - String error = "Probe " + url + " failed: " + e; + } catch (Throwable e) { + String error = "Probe " + urlString + " failed for IP " + ip + ": " + e; log.info(error, e); status.fail(this, new IOException(error, e)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java index f7bdd4a..e97ab43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java @@ -23,257 +23,44 @@ public interface MonitorKeys { /** - * Prefix of all other configuration options: {@value} + * Port probing key : port to attempt to create a TCP connection to {@value}. */ - String MONITOR_KEY_PREFIX = "service.monitor."; - - - /** - * Classname of the reporter Key: {@value} - */ - String MONITOR_REPORTER = - MONITOR_KEY_PREFIX + "report.classname"; - - /** - * Interval in milliseconds between reporting health status to the reporter - * Key: {@value} - */ - String MONITOR_REPORT_INTERVAL = - MONITOR_KEY_PREFIX + "report.interval"; - - /** - * Time in millis between the last probing cycle ending and the new one - * beginning. Key: {@value} - */ - String MONITOR_PROBE_INTERVAL = - MONITOR_KEY_PREFIX + "probe.interval"; - - /** - * How long in milliseconds does the probing loop have to be blocked before - * that is considered a liveness failure Key: {@value} - */ - String MONITOR_PROBE_TIMEOUT = - MONITOR_KEY_PREFIX + "probe.timeout"; - - /** - * How long in milliseconds does the probing loop have to be blocked before - * that is considered a liveness failure Key: {@value} - */ - String MONITOR_BOOTSTRAP_TIMEOUT = - MONITOR_KEY_PREFIX + "bootstrap.timeout"; - - - /** - * does the monitor depend on DFS being live - */ - String MONITOR_DEPENDENCY_DFSLIVE = - MONITOR_KEY_PREFIX + "dependency.dfslive"; - - - /** - * default timeout for the entire bootstrap phase {@value} - */ - - int BOOTSTRAP_TIMEOUT_DEFAULT = 60000; - - - /** - * Default value if the key is not in the config file: {@value} - */ - int REPORT_INTERVAL_DEFAULT = 10000; + String PORT_PROBE_PORT = "port"; /** - * Default value if the key is not in the config file: {@value} + * Port probing key : timeout for the the connection attempt {@value}. */ - int PROBE_INTERVAL_DEFAULT = 10000; + String PORT_PROBE_CONNECT_TIMEOUT = "timeout"; /** - * Default value if the key is not in the config file: {@value} + * Port probing default : timeout for the connection attempt {@value}. */ - int PROBE_TIMEOUT_DEFAULT = 60000; - - /** - * Port probe enabled/disabled flag Key: {@value} - */ - String PORT_PROBE_ENABLED = - MONITOR_KEY_PREFIX + "portprobe.enabled"; - - - /** - * Port probing key : port to attempt to create a TCP connection to {@value} - */ - String PORT_PROBE_PORT = - MONITOR_KEY_PREFIX + "portprobe.port"; - - /** - * Port probing key : port to attempt to create a TCP connection to {@value} - */ - String PORT_PROBE_HOST = - MONITOR_KEY_PREFIX + "portprobe.host"; - - - /** - * Port probing key : timeout of the connection attempt {@value} - */ - String PORT_PROBE_CONNECT_TIMEOUT = - MONITOR_KEY_PREFIX + "portprobe.connect.timeout"; - - /** - * Port probing key : bootstrap timeout -how long in milliseconds should the - * port probing take to connect before the failure to connect is considered a - * liveness failure. That is: how long should the IPC port take to come up? - * {@value} - */ - String PORT_PROBE_BOOTSTRAP_TIMEOUT = - MONITOR_KEY_PREFIX + "portprobe.bootstrap.timeout"; - - - /** - * default timeout for port probes {@value} - */ - int PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = 60000; - - /** - * default value for port probe connection attempts {@value} - */ - int PORT_PROBE_CONNECT_TIMEOUT_DEFAULT = 1000; - - /** - * default port for probes {@value} - */ - int DEFAULT_PROBE_PORT = 8020; - - /** - * default host for probes {@value} + * Web probing key : URL {@value}. */ - String DEFAULT_PROBE_HOST = "localhost"; - - + String WEB_PROBE_URL = "url"; /** - * Probe enabled/disabled flag Key: {@value} + * Web probing key : min success code {@value}. */ - String LS_PROBE_ENABLED = - MONITOR_KEY_PREFIX + "lsprobe.enabled"; - + String WEB_PROBE_MIN_SUCCESS = "min.success"; /** - * Probe path for LS operation Key: {@value} + * Web probing key : max success code {@value}. */ - String LS_PROBE_PATH = - MONITOR_KEY_PREFIX + "lsprobe.path"; - + String WEB_PROBE_MAX_SUCCESS = "max.success"; /** - * Default path for LS operation Key: {@value} + * Web probing default : min successful response code {@value}. */ - String LS_PROBE_DEFAULT = "/"; - + int WEB_PROBE_MIN_SUCCESS_DEFAULT = 200; /** - * Port probing key : bootstrap timeout -how long in milliseconds should the - * port probing take to connect before the failure to connect is considered a - * liveness failure. That is: how long should the IPC port take to come up? - * {@value} + * Web probing default : max successful response code {@value}. */ - String LS_PROBE_BOOTSTRAP_TIMEOUT = - MONITOR_KEY_PREFIX + "lsprobe.bootstrap.timeout"; - - + int WEB_PROBE_MAX_SUCCESS_DEFAULT = 299; /** - * default timeout for port probes {@value} + * Web probing key : timeout for the connection attempt {@value} */ - - int LS_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT; - - - /** - * Probe enabled/disabled flag Key: {@value} - */ - String WEB_PROBE_ENABLED = - MONITOR_KEY_PREFIX + "webprobe.enabled"; - + String WEB_PROBE_CONNECT_TIMEOUT = "timeout"; /** - * Probe URL Key: {@value} + * Port probing default : timeout for the connection attempt {@value}. */ - String WEB_PROBE_URL = - MONITOR_KEY_PREFIX + "webprobe.url"; - - /** - * Default path for web probe Key: {@value} - */ - String WEB_PROBE_DEFAULT_URL = "http://localhost:50070/"; - - /** - * min error code Key: {@value} - */ - String WEB_PROBE_MIN = - MONITOR_KEY_PREFIX + "webprobe.min"; - /** - * min error code Key: {@value} - */ - String WEB_PROBE_MAX = - MONITOR_KEY_PREFIX + "webprobe.max"; - - - /** - * Port probing key : timeout of the connection attempt {@value} - */ - String WEB_PROBE_CONNECT_TIMEOUT = - MONITOR_KEY_PREFIX + "webprobe.connect.timeout"; - - /** - * Default HTTP response code expected from the far end for - * the endpoint to be considered live. - */ - int WEB_PROBE_DEFAULT_CODE = 200; - - /** - * Port probing key : bootstrap timeout -how long in milliseconds should the - * port probing take to connect before the failure to connect is considered a - * liveness failure. That is: how long should the IPC port take to come up? - * {@value} - */ - String WEB_PROBE_BOOTSTRAP_TIMEOUT = - MONITOR_KEY_PREFIX + "webprobe.bootstrap.timeout"; - - - /** - * default timeout for port probes {@value} - */ - - int WEB_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT; - - /** - * Probe enabled/disabled flag Key: {@value} - */ - String JT_PROBE_ENABLED = - MONITOR_KEY_PREFIX + "jtprobe.enabled"; - - /** - * Port probing key : bootstrap timeout -how long in milliseconds should the - * port probing take to connect before the failure to connect is considered a - * liveness failure. That is: how long should the IPC port take to come up? - * {@value} - */ - String JT_PROBE_BOOTSTRAP_TIMEOUT = - MONITOR_KEY_PREFIX + "jtprobe.bootstrap.timeout"; - - - /** - * default timeout for port probes {@value} - */ - - int JT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT; - - - /** - * Probe enabled/disabled flag Key: {@value} - */ - String PID_PROBE_ENABLED = - MONITOR_KEY_PREFIX + "pidprobe.enabled"; - - /** - * PID probing key : pid to attempt to create a TCP connection to {@value} - */ - String PID_PROBE_PIDFILE = - MONITOR_KEY_PREFIX + "pidprobe.pidfile"; - + int WEB_PROBE_CONNECT_TIMEOUT_DEFAULT = 1000; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java index a4447e3..1e5c94c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java @@ -17,25 +17,19 @@ package org.apache.slider.server.servicemonitor; +import org.apache.slider.api.resource.ReadinessCheck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Formatter; -import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.TreeSet; /** * Various utils to work with the monitor */ public final class MonitorUtils { - protected static final Logger log = LoggerFactory.getLogger(MonitorUtils.class); + protected static final Logger LOG = LoggerFactory.getLogger(MonitorUtils + .class); private MonitorUtils() { } @@ -45,25 +39,6 @@ public static String toPlural(int val) { } /** - * Convert the arguments -including dropping any empty strings that creep in - * @param args arguments - * @return a list view with no empty strings - */ - public static List prepareArgs(String[] args) { - List argsList = new ArrayList(args.length); - StringBuilder argsStr = new StringBuilder("Arguments: ["); - for (String arg : args) { - argsStr.append('"').append(arg).append("\" "); - if (!arg.isEmpty()) { - argsList.add(arg); - } - } - argsStr.append(']'); - log.debug(argsStr.toString()); - return argsList; - } - - /** * Convert milliseconds to human time -the exact format is unspecified * @param milliseconds a time in milliseconds * @return a time that is converted to human intervals @@ -85,25 +60,25 @@ public static String millisToHumanTime(long milliseconds) { return sb.toString(); } - public static InetSocketAddress getURIAddress(URI uri) { - String host = uri.getHost(); - int port = uri.getPort(); - return new InetSocketAddress(host, port); - } - - - /** - * Get the localhost -may be null - * @return the localhost if known - */ - public static InetAddress getLocalHost() { - InetAddress localHost; + public static Probe getProbe(ReadinessCheck readinessCheck) { + if (readinessCheck == null) { + return null; + } + if (readinessCheck.getType() == null) { + return null; + } try { - localHost = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - localHost = null; + switch (readinessCheck.getType()) { + case HTTP: + return HttpProbe.create(readinessCheck.getProps()); + case PORT: + return PortProbe.create(readinessCheck.getProps()); + default: + return null; + } + } catch (Throwable t) { + throw new IllegalArgumentException("Error creating readiness check " + + t); } - return localHost; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java index b1ff792..252242f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java @@ -17,91 +17,77 @@ package org.apache.slider.server.servicemonitor; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; +import org.apache.slider.server.appmaster.state.RoleInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Map; /** - * Probe for a port being open + * Probe for a port being open. */ public class PortProbe extends Probe { protected static final Logger log = LoggerFactory.getLogger(PortProbe.class); - private final String host; private final int port; private final int timeout; - public PortProbe(String host, int port, int timeout, String name, Configuration conf) - throws IOException { - super("Port probe " + name + " " + host + ":" + port + " for " + timeout + "ms", - conf); - this.host = host; + public PortProbe(int port, int timeout) { + super("Port probe of " + port + " for " + timeout + "ms", null); this.port = port; this.timeout = timeout; } - public static PortProbe createPortProbe(Configuration conf, - String hostname, - int port) throws IOException { - PortProbe portProbe = new PortProbe(hostname, - port, - conf.getInt( - PORT_PROBE_CONNECT_TIMEOUT, - PORT_PROBE_CONNECT_TIMEOUT_DEFAULT), - "", - conf); - - return portProbe; - } + public static PortProbe create(Map props) + throws IOException { + int port = getPropertyInt(props, PORT_PROBE_PORT, null); - @Override - public void init() throws IOException { if (port >= 65536) { - throw new IOException("Port is out of range: " + port); - } - InetAddress target; - if (host != null) { - log.debug("looking up host " + host); - target = InetAddress.getByName(host); - } else { - log.debug("Host is null, retrieving localhost address"); - target = InetAddress.getLocalHost(); + throw new IOException(PORT_PROBE_PORT + " " + port + " is out of " + + "range"); } - log.info("Checking " + target + ":" + port); + + int timeout = getPropertyInt(props, PORT_PROBE_CONNECT_TIMEOUT, + PORT_PROBE_CONNECT_TIMEOUT_DEFAULT); + + return new PortProbe(port, timeout); } /** * Try to connect to the (host,port); a failure to connect within - * the specified timeout is a failure - * @param livePing is the ping live: true for live; false for boot time + * the specified timeout is a failure. + * @param roleInstance role instance * @return the outcome */ @Override - public ProbeStatus ping(boolean livePing) { + public ProbeStatus ping(RoleInstance roleInstance) { ProbeStatus status = new ProbeStatus(); - InetSocketAddress sockAddr = new InetSocketAddress(host, port); + + String ip = roleInstance.ip; + if (ip == null) { + status.fail(this, new IOException("IP is not available yet")); + return status; + } + + InetSocketAddress sockAddr = new InetSocketAddress(ip, port); Socket socket = new Socket(); try { if (log.isDebugEnabled()) { - log.debug("Connecting to " + sockAddr.toString() + " connection-timeout=" + - MonitorUtils.millisToHumanTime(timeout)); + log.debug("Connecting to " + sockAddr.toString() + "timeout=" + + MonitorUtils.millisToHumanTime(timeout)); } socket.connect(sockAddr, timeout); status.succeed(this); - } catch (IOException e) { + } catch (Throwable e) { String error = "Probe " + sockAddr + " failed: " + e; log.debug(error, e); - status.fail(this, - new IOException(error, e)); + status.fail(this, new IOException(error, e)); } finally { IOUtils.closeSocket(socket); } return status; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java index be4b5ef..d31cfb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java @@ -17,9 +17,12 @@ package org.apache.slider.server.servicemonitor; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.slider.server.appmaster.state.RoleInstance; import java.io.IOException; +import java.util.Map; /** * Base class of all probes. @@ -72,6 +75,30 @@ public String toString() { '}'; } + public static String getProperty(Map props, String name, + String defaultValue) throws IOException { + String value = props.get(name); + if (StringUtils.isEmpty(value)) { + if (defaultValue == null) { + throw new IOException(name + " not specified"); + } + return defaultValue; + } + return value; + } + + public static int getPropertyInt(Map props, String name, + Integer defaultValue) throws IOException { + String value = props.get(name); + if (StringUtils.isEmpty(value)) { + if (defaultValue == null) { + throw new IOException(name + " not specified"); + } + return defaultValue; + } + return Integer.parseInt(value); + } + /** * perform any prelaunch initialization */ @@ -83,10 +110,10 @@ public void init() throws IOException { * Ping the endpoint. All exceptions must be caught and included in the * (failure) status. * - * @param livePing is the ping live: true for live; false for boot time + * @param roleInstance instance to ping * @return the status */ - public abstract ProbeStatus ping(boolean livePing); + public abstract ProbeStatus ping(RoleInstance roleInstance); public void beginBootstrap() { bootstrapStarted = System.currentTimeMillis(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java deleted file mode 100644 index f09b848..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -/** - * An exception to raise on a probe failure - */ -public class ProbeFailedException extends Exception { - - public final ProbeStatus status; - - public ProbeFailedException(String text, ProbeStatus status) { - super((text == null ? "Probe Failed" : (text + ": ")) + status, status.getThrown()); - this.status = status; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java deleted file mode 100644 index d87c81b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -/** - * Probe phases. The names are for strings; the index is the order in which things happen; - * -any state can got to terminating directly. - */ -public enum ProbePhase { - INIT("Initializing", 0), - DEPENDENCY_CHECKING("Dependencies", 1), - BOOTSTRAPPING("Bootstrapping", 2), - LIVE("Live", 3), - TERMINATING("Terminating", 4); - - private final String name; - private final int index; - - ProbePhase(String name, int index) { - this.name = name; - this.index = index; - } - - public String getName() { - return name; - } - - public int getIndex() { - return index; - } - - /** - * How many phases are there? - */ - public static final int PHASE_COUNT = TERMINATING.index + 1; - - @Override - public String toString() { - return name; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java deleted file mode 100644 index 36c20c8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -/** - * This interface is for use by the Poll Workers to send events to the reporters. - * - * It is up the reporters what to do with the specific events. - */ -public interface ProbeReportHandler { - - /** - * The probe process has changed state. - * @param probePhase the new process phrase - */ - void probeProcessStateChange(ProbePhase probePhase); - - /** - * Report a probe outcome - * @param phase the current phase of probing - * @param status the probe status - */ - void probeResult(ProbePhase phase, ProbeStatus status); - - /** - * A probe has failed - */ - void probeFailure(ProbeFailedException exception); - - /** - * A probe has just booted - * @param status probe status - */ - void probeBooted(ProbeStatus status); - - boolean commence(String name, String description); - - void unregister(); - - /** - * A heartbeat event should be raised - * @param status the probe status - */ - void heartbeat(ProbeStatus status); - - /** - * A probe has timed out - * @param currentPhase the current execution phase - * @param probe the probe that timed out - * @param lastStatus the last status that was successfully received -which is implicitly - * not the status of the timed out probe - * @param currentTime the current time - */ - void probeTimedOut(ProbePhase currentPhase, - Probe probe, - ProbeStatus lastStatus, - long currentTime); - - /** - * Event to say that the live probe cycle completed so the entire - * system can be considered functional. - */ - void liveProbeCycleCompleted(); -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java index b4deabc..24668bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java @@ -34,7 +34,6 @@ private String message; private Throwable thrown; private transient Probe originator; - private ProbePhase probePhase; public ProbeStatus() { } @@ -99,14 +98,6 @@ public void setThrown(Throwable thrown) { this.thrown = thrown; } - public ProbePhase getProbePhase() { - return probePhase; - } - - public void setProbePhase(ProbePhase probePhase) { - this.probePhase = probePhase; - } - /** * Get the probe that generated this result. May be null * @return a possibly null reference to a probe @@ -147,7 +138,6 @@ public void finish(Probe probe, boolean succeeded, String text, Throwable thrown public String toString() { LogEntryBuilder builder = new LogEntryBuilder("Probe Status"); builder.elt("time", timestampText) - .elt("phase", probePhase) .elt("outcome", (success ? "success" : "failure")); if (success != realOutcome) { @@ -161,10 +151,6 @@ public String toString() { return builder.toString(); } - public boolean inPhase(ProbePhase phase) { - return getProbePhase().equals(phase); - } - /** * Flip the success bit on while the real outcome bit is kept false */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java deleted file mode 100644 index f64ec8d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * This is the entry point to do work. A list of probes is taken in, in order of - * booting. Once live they go to the live probes list. - * - * The dependency probes are a set of probes for dependent services, all of which - * must be live before boot probes commence. - * - * The boot probes are executed and are allowed to fail; failure is interpreted as "not yet live" - * - * Once all boot probes are live, the live list is used for probes; these must not fail. - * - * There is no timeout on dependency probe bootstrap time, because of the notion that - * restarting this service will have no effect on the dependencies. - */ - -public class ProbeWorker implements Runnable { - protected static final Logger log = LoggerFactory.getLogger(ProbeWorker.class); - - public static final String FAILED_TO_BOOT = "Monitored service failed to bootstrap after "; - public static final String FAILURE_OF_A_LIVE_PROBE_DURING_BOOTSTRAPPING = "Failure of a live probe during bootstrapping"; - private final List monitorProbes; - private final List dependencyProbes; - public final int interval; - protected volatile ProbeStatus lastStatus; - protected volatile ProbeStatus lastFailingBootstrapProbe; - protected volatile Probe currentProbe; - private volatile boolean mustExit; - private final int bootstrapTimeout; - private long bootstrapEndtime; - - private ProbeReportHandler reportHandler; - private volatile ProbePhase probePhase = ProbePhase.INIT; - - /** - * Create a probe worker - * @param monitorProbes list of probes that must boot and then go live -after which - * they must stay live. - * @param dependencyProbes the list of dependency probes that must all succeed before - * any attempt to probe the direct probe list is performed. Once the - * dependency phase has completed, these probes are never checked again. - * @param interval probe interval in milliseconds. - * @param bootstrapTimeout timeout for bootstrap in milliseconds - */ - public ProbeWorker(List monitorProbes, List dependencyProbes, int interval, int bootstrapTimeout) { - this.monitorProbes = monitorProbes; - this.dependencyProbes = dependencyProbes != null ? dependencyProbes : new ArrayList(0); - this.interval = interval; - lastStatus = new ProbeStatus(now(), - "Initial status"); - lastStatus.setProbePhase(ProbePhase.INIT); - this.bootstrapTimeout = bootstrapTimeout; - } - - public void init() throws IOException { - for (Probe probe : monitorProbes) { - probe.init(); - } - for (Probe probe : dependencyProbes) { - probe.init(); - } - } - - public void setReportHandler(ProbeReportHandler reportHandler) { - this.reportHandler = reportHandler; - } - - public void setMustExit() { - this.mustExit = true; - } - - public ProbeStatus getLastStatus() { - return lastStatus; - } - - public synchronized Probe getCurrentProbe() { - return currentProbe; - } - - public ProbePhase getProbePhase() { - return probePhase; - } - - /** - * Enter the new process state, and report it to the report handler. - * This is synchronized just to make sure there isn't more than one - * invocation at the same time. - * @param status the new process status - */ - private synchronized void enterProbePhase(ProbePhase status) { - this.probePhase = status; - if (reportHandler != null) { - reportHandler.probeProcessStateChange(status); - } - } - - /** - * Report the probe status to the listener -setting the probe phase field - * before doing so. - * The value is also stored in the {@link #lastStatus} field - * @param status the new status - */ - private void reportProbeStatus(ProbeStatus status) { - ProbePhase phase = getProbePhase(); - status.setProbePhase(phase); - lastStatus = status; - reportHandler.probeResult(phase, status); - } - - /** - * Ping one probe. Logs the operation at debug level; sets the field currentProbe - * to the probe for the duration of the operation -this is used when identifying the - * cause of a hung reporting loop - * @param probe probe to ping - * @param live flag to indicate whether or not the operation is live or bootstrapping - * @return the status of the ping - * @throws ProbeInterruptedException if the probe has been told to exit - */ - private ProbeStatus ping(Probe probe, boolean live) throws ProbeInterruptedException { - if (log.isDebugEnabled()) { - log.debug("Executing " + probe); - } - checkForExitRequest(); - currentProbe = probe; - try { - return probe.ping(live); - } finally { - currentProbe = null; - } - } - - /** - * Check for an exit request -and convert it to an exception if made - * @throws ProbeInterruptedException iff {@link #mustExit} is true - */ - private void checkForExitRequest() throws ProbeInterruptedException { - if (mustExit) { - throw new ProbeInterruptedException(); - } - } - - /** - * Check the dependencies. - * The moment a failing test is reached the call returns without - * any reporting. - * - * All successful probes are reported, so as to keep the heartbeats happy. - * - * @return the status of the last dependency check. If this is a success - * them every probe passed. - */ - private ProbeStatus checkDependencyProbes() throws ProbeInterruptedException { - ProbeStatus status = null; - for (Probe dependency : dependencyProbes) { - //ping them, making clear they are not to run any bootstrap logic - status = ping(dependency, true); - - if (!status.isSuccess()) { - //the first failure means the rest of the list can be skipped - break; - } - reportProbeStatus(status); - } - //return the last status - return status; - } - - /** - * Run through all the dependency probes and report their outcomes successes (even if they fail) - * @return true iff all the probes have succeeded. - * @throws ProbeInterruptedException if the process was interrupted. - */ - public boolean checkAndReportDependencyProbes() throws ProbeInterruptedException { - ProbeStatus status; - status = checkDependencyProbes(); - if (status != null && !status.isSuccess()) { - //during dependency checking, a failure is still reported as a success - status.markAsSuccessful(); - reportProbeStatus(status); - //then return without checking anything else - return false; - } - //all dependencies are done. - return true; - } - - /** - * Begin bootstrapping by telling each probe that they have started. - * This sets the timeouts up, as well as permits any other set-up actions - * to begin. - */ - private void beginBootstrapProbes() { - synchronized (this) { - bootstrapEndtime = now() + bootstrapTimeout; - } - for (Probe probe : monitorProbes) { - probe.beginBootstrap(); - } - } - - private long now() { - return System.currentTimeMillis(); - } - - - /** - * Check the bootstrap probe list. All successful probes get reported. - * The first unsuccessful probe will be returned and not reported (left for policy upstream). - * If the failing probe has timed out, that is turned into a {@link ProbeFailedException} - * @return the last (unsuccessful) probe, or null if they all succeeded - * @throws ProbeInterruptedException interrupts - * @throws ProbeFailedException on a boot timeout - */ - private boolean checkBootstrapProbes() throws ProbeInterruptedException, ProbeFailedException { - verifyBootstrapHasNotTimedOut(); - - boolean probeFailed = false; - //now run through all the bootstrap probes - for (Probe probe : monitorProbes) { - //ping them - ProbeStatus status = ping(probe, false); - if (!status.isSuccess()) { - probeFailed = true; - lastFailingBootstrapProbe = status; - probe.failureCount++; - if (log.isDebugEnabled()) { - log.debug("Booting probe failed: " + status); - } - //at this point check to see if the timeout has occurred -and if so, force in the last probe status. - - //this is a failure but not a timeout - //during boot, a failure of a probe that hasn't booted is still reported as a success - if (!probe.isBooted()) { - //so the success bit is flipped - status.markAsSuccessful(); - reportProbeStatus(status); - } else { - //the probe had booted but then it switched to failing - - //update the status unedited - reportProbeStatus(status); - //then fail - throw raiseProbeFailure(status, FAILURE_OF_A_LIVE_PROBE_DURING_BOOTSTRAPPING); - } - } else { - //this probe is working - if (!probe.isBooted()) { - //if it is new, mark it as live - if (log.isDebugEnabled()) { - log.debug("Booting probe is now live: " + probe); - } - probe.endBootstrap(); - //tell the report handler that another probe has booted - reportHandler.probeBooted(status); - } - //push out its status - reportProbeStatus(status); - probe.successCount++; - } - } - return !probeFailed; - } - - - public int getBootstrapTimeout() { - return bootstrapTimeout; - } - - /** - * This checks that bootstrap operations have not timed out - * @throws ProbeFailedException if the bootstrap has failed - */ - public void verifyBootstrapHasNotTimedOut() throws ProbeFailedException { - //first step -look for a timeout - if (isBootstrapTimeExceeded()) { - String text = FAILED_TO_BOOT - + MonitorUtils.millisToHumanTime(bootstrapTimeout); - - ProbeStatus status; - if (lastFailingBootstrapProbe != null) { - status = lastFailingBootstrapProbe; - status.setSuccess(false); - } else { - status = new ProbeStatus(); - status.finish(null, false, text, null); - } - - throw raiseProbeFailure(status, - text); - } - } - - /** - * predicate that gets current time and checks for its time being exceeded. - * @return true iff the current time is > the end time - */ - public synchronized boolean isBootstrapTimeExceeded() { - return now() > bootstrapEndtime; - } - - /** - * run through all the bootstrap probes and see if they are live. - * @return true iff all boot probes succeeded - * @throws ProbeInterruptedException the probe interruption flags - * @throws ProbeFailedException if a probe failed. - */ - public boolean checkAndReportBootstrapProbes() throws ProbeInterruptedException, - ProbeFailedException { - if (bootstrapTimeout <= 0) { - //there is no period of grace for bootstrapping probes, so return true saying - //this phase is complete - return true; - } - //now the bootstrapping probes - return checkBootstrapProbes(); - } - - - /** - * run through all the live probes, pinging and reporting them. - * A single probe failure is turned into an exception - * @throws ProbeFailedException a probe failed - * @throws ProbeInterruptedException the probe process was explicitly interrupted - */ - protected void checkAndReportLiveProbes() throws ProbeFailedException, ProbeInterruptedException { - ProbeStatus status = null; - //go through the live list - if (log.isDebugEnabled()) { - log.debug("Checking live probes"); - } - for (Probe probe : monitorProbes) { - status = ping(probe, true); - reportProbeStatus(status); - if (!status.isSuccess()) { - throw raiseProbeFailure(status, "Failure of probe in \"live\" monitor"); - } - probe.successCount++; - } - //here all is well, so notify the reporter - reportHandler.liveProbeCycleCompleted(); - } - - /** - * Run the set of probes relevant for this phase of the probe lifecycle. - * @throws ProbeFailedException a probe failed - * @throws ProbeInterruptedException the probe process was explicitly interrupted - */ - protected void executeProbePhases() throws ProbeFailedException, ProbeInterruptedException { - switch (probePhase) { - case INIT: - enterProbePhase(ProbePhase.DEPENDENCY_CHECKING); - //fall through straight into the dependency check - case DEPENDENCY_CHECKING: - if (checkAndReportDependencyProbes()) { - enterProbePhase(ProbePhase.BOOTSTRAPPING); - beginBootstrapProbes(); - } - break; - case BOOTSTRAPPING: - if (checkAndReportBootstrapProbes()) { - enterProbePhase(ProbePhase.LIVE); - } - break; - case LIVE: - checkAndReportLiveProbes(); - break; - - case TERMINATING: - default: - //do nothing. - break; - } - } - - - /** - * Raise a probe failure; injecting the phase into the status result first - * - * @param status ping result - * @param text optional text -null or "" means "none" - * @return an exception ready to throw - */ - private ProbeFailedException raiseProbeFailure(ProbeStatus status, String text) { - status.setProbePhase(probePhase); - log.info("Probe failed: " + status); - return new ProbeFailedException(text, status); - } - - @Override - public void run() { - int size = monitorProbes.size(); - log.info("Probe Worker Starting; " + size + " probe" + MonitorUtils.toPlural(size) + ":"); - enterProbePhase(ProbePhase.DEPENDENCY_CHECKING); - for (Probe probe : monitorProbes) { - log.info(probe.getName()); - } - while (!mustExit) { - try { - Thread.sleep(interval); - executeProbePhases(); - } catch (ProbeFailedException e) { - //relay to the inner loop handler - probeFailed(e); - } catch (InterruptedException interrupted) { - break; - } catch (ProbeInterruptedException e) { - //exit raised. - //this will be true, just making extra-sure - break; - } - } - log.info("Probe Worker Exiting"); - enterProbePhase(ProbePhase.TERMINATING); - } - - - protected void probeFailed(ProbeFailedException e) { - reportHandler.probeFailure(e); - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java deleted file mode 100644 index 096838d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * This is the monitor service - */ -public final class ReportingLoop implements Runnable, ProbeReportHandler, MonitorKeys, Closeable { - protected static final Logger log = LoggerFactory.getLogger(ReportingLoop.class); - private final ProbeWorker worker; - private final Thread workerThread; - private final int reportInterval; - private final int probeTimeout; - private final int bootstrapTimeout; - private ProbeReportHandler reporter; - private final String name; - private volatile boolean mustExit; - - public ReportingLoop(String name, - ProbeReportHandler reporter, - List probes, - List dependencyProbes, - int probeInterval, - int reportInterval, - int probeTimeout, - int bootstrapTimeout) throws IOException { - this(name, - reporter, - new ProbeWorker(probes, dependencyProbes, probeInterval, bootstrapTimeout), - reportInterval, - probeTimeout); - } - - /** - * Create a new reporting loop -and bond the worker's ProbeReportHandler - * to us - * @param name - * @param reporter - * @param worker - * @param reportInterval - * @param probeTimeout - */ - public ReportingLoop(String name, - ProbeReportHandler reporter, - ProbeWorker worker, - int reportInterval, - int probeTimeout) throws IOException { - this.name = name; - this.reporter = reporter; - this.reportInterval = reportInterval; - this.probeTimeout = probeTimeout; - this.worker = worker; - this.bootstrapTimeout = worker.getBootstrapTimeout(); - worker.setReportHandler(this); - workerThread = new Thread(worker, "probe thread - " + name); - worker.init(); - } - - public int getBootstrapTimeout() { - return bootstrapTimeout; - } - - public ReportingLoop withReporter(ProbeReportHandler reporter) { - assert this.reporter == null : "attempting to reassign reporter "; - assert reporter != null : "new reporter is null"; - this.reporter = reporter; - return this; - } - - /** - * Start the monitoring. - * - * @return false if the monitoring did not start and that the worker threads - * should be run up. - */ - public boolean startReporting() { - String description = "Service Monitor for " + name + ", probe-interval= " - + MonitorUtils.millisToHumanTime(worker.interval) - + ", report-interval=" + MonitorUtils.millisToHumanTime(reportInterval) - + ", probe-timeout=" + timeoutToStr(probeTimeout) - + ", bootstrap-timeout=" + timeoutToStr(bootstrapTimeout); - log.info("Starting reporting" - + " to " + reporter - + description); - return reporter.commence(name, description); - } - - private String timeoutToStr(int timeout) { - return timeout >= 0 ? MonitorUtils.millisToHumanTime(timeout) : "not set"; - } - - private void startWorker() { - log.info("Starting reporting worker thread "); - workerThread.setDaemon(true); - workerThread.start(); - } - - - /** - * This exits the process cleanly - */ - @Override - public void close() { - log.info("Stopping reporting"); - mustExit = true; - if (worker != null) { - worker.setMustExit(); - workerThread.interrupt(); - } - if (reporter != null) { - reporter.unregister(); - } - } - - @Override - public void probeFailure(ProbeFailedException exception) { - reporter.probeFailure(exception); - } - - @Override - public void probeProcessStateChange(ProbePhase probePhase) { - reporter.probeProcessStateChange(probePhase); - } - - @Override - public void probeBooted(ProbeStatus status) { - reporter.probeBooted(status); - } - - private long now() { - return System.currentTimeMillis(); - } - - @Override - public void probeResult(ProbePhase phase, ProbeStatus status) { - reporter.probeResult(phase, status); - } - - @Override - public boolean commence(String n, String description) { - return true; - } - - @Override - public void unregister() { - } - - @Override - public void heartbeat(ProbeStatus status) { - } - - @Override - public void probeTimedOut(ProbePhase currentPhase, Probe probe, ProbeStatus lastStatus, - long currentTime) { - } - - @Override - public void liveProbeCycleCompleted() { - //delegate to the reporter - reporter.liveProbeCycleCompleted(); - } - - /** - * The reporting loop - */ - void reportingLoop() { - - while (!mustExit) { - try { - ProbeStatus workerStatus = worker.getLastStatus(); - long now = now(); - long lastStatusIssued = workerStatus.getTimestamp(); - long timeSinceLastStatusIssued = now - lastStatusIssued; - //two actions can occur here: a heartbeat is issued or a timeout reported. - //this flag decides which - boolean heartbeat; - - //based on phase, decide whether to heartbeat or timeout - ProbePhase probePhase = worker.getProbePhase(); - switch (probePhase) { - case DEPENDENCY_CHECKING: - //no timeouts in dependency phase - heartbeat = true; - break; - - case BOOTSTRAPPING: - //the timeout here is fairly straightforward: heartbeats are - //raised while the worker hasn't timed out - heartbeat = bootstrapTimeout < 0 || timeSinceLastStatusIssued < bootstrapTimeout; - - break; - - case LIVE: - //use the probe timeout interval between the current time - //and the time the last status event was received. - heartbeat = timeSinceLastStatusIssued < probeTimeout; - break; - - case INIT: - case TERMINATING: - default: - //send a heartbeat, because this isn't the time to be failing - heartbeat = true; - } - if (heartbeat) { - //a heartbeat is sent to the reporter - reporter.heartbeat(workerStatus); - } else { - //no response from the worker -it is hung. - reporter.probeTimedOut(probePhase, - worker.getCurrentProbe(), - workerStatus, - now - ); - } - - //now sleep - Thread.sleep(reportInterval); - - } catch (InterruptedException e) { - //interrupted -always exit the loop. - break; - } - } - //this point is reached if and only if a clean exit was requested or something failed. - } - - /** - * This can be run in a separate thread, or it can be run directly from the caller. - * Test runs do the latter, HAM runs multiple reporting threads. - */ - @Override - public void run() { - try { - startWorker(); - reportingLoop(); - } catch (RuntimeException e) { - log.warn("Failure in the reporting loop: " + e, e); - //rethrow so that inline code can pick it up (e.g. test runs) - throw e; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/RestApiErrorMessages.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/RestApiErrorMessages.java index 676db82..9da3958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/RestApiErrorMessages.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/RestApiErrorMessages.java @@ -62,6 +62,8 @@ "Invalid no of containers specified"; String ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID = ERROR_CONTAINERS_COUNT_INVALID + ERROR_SUFFIX_FOR_COMPONENT; + String ERROR_DEPENDENCY_INVALID = "Dependency %s for component %s is " + + "invalid, does not exist as a component"; String ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED = "Cannot specify" + " cpus/memory along with profile"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/ServiceApiUtil.java index 80a31c0..5311003 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/util/ServiceApiUtil.java @@ -32,6 +32,7 @@ import org.apache.slider.core.persist.JsonSerDeser; import org.apache.slider.providers.AbstractClientProvider; import org.apache.slider.providers.SliderProviderFactory; +import org.apache.slider.server.servicemonitor.MonitorUtils; import org.codehaus.jackson.map.PropertyNamingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,6 +177,16 @@ public static void validateAndResolveApplication(Application application, if (comp.getLaunchCommand() == null) { comp.setLaunchCommand(globalLaunchCommand); } + // validate dependencies + if (comp.getDependencies() != null) { + for (String dependency : comp.getDependencies()) { + if (!componentNames.contains(dependency)) { + throw new IllegalArgumentException(String.format( + RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, dependency, + comp.getName())); + } + } + } validateComponent(comp, fs.getFileSystem()); } @@ -207,6 +218,8 @@ public static void validateComponent(Component comp, FileSystem fs) } compClientProvider.validateConfigFiles(comp.getConfiguration() .getFiles(), fs); + + MonitorUtils.getProbe(comp.getReadinessCheck()); } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDependencies.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDependencies.java new file mode 100644 index 0000000..a73673d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateDependencies.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.server.appmaster.model.appstate; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.slider.api.types.ApplicationLivenessInformation; +import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest; +import org.apache.slider.server.appmaster.model.mock.MockRoles; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.state.ContainerAssignment; +import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.slider.server.servicemonitor.ProbeStatus; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestMockAppStateDependencies extends BaseMockAppStateTest + implements MockRoles { + + @Override + public String getTestName() { + return "TestMockAppStateDependencies"; + } + + @Test + public void testDependencies() throws Throwable { + + // ask for one instance of role0 + getRole0Status().setDesired(1); + getRole0Status().getProviderRole().probe = new org.apache.slider.server + .servicemonitor.Probe("test", null) { + @Override + public ProbeStatus ping(RoleInstance roleInstance) { + ProbeStatus status = new ProbeStatus(); + status.succeed(this); + return status; + } + }; + getRole1Status().setDesired(1); + getRole1Status().getProviderRole().component.setDependencies(Collections + .singletonList(ROLE0)); + + assertTrue(appState.areDependenciesReady(getRole0Status())); + assertFalse(appState.areDependenciesReady(getRole1Status())); + review(ROLE0, 2); + + assertFalse(appState.areDependenciesReady(getRole1Status())); + appState.monitorComponentInstances(); + assertTrue(appState.areDependenciesReady(getRole1Status())); + getRole0Status().setDesired(2); + assertFalse(appState.areDependenciesReady(getRole1Status())); + review(ROLE0, 2); + + assertFalse(appState.areDependenciesReady(getRole1Status())); + appState.monitorComponentInstances(); + assertTrue(appState.areDependenciesReady(getRole1Status())); + review(ROLE1, 1); + } + + public void review(String expectedRole, int outstanding) throws Exception { + List ops = appState.reviewRequestAndReleaseNodes(); + + // at this point there's now one request in the list + assertEquals(1, ops.size()); + // and in a liveness check, expected outstanding + ApplicationLivenessInformation liveness = + appState.getApplicationLivenessInformation(); + assertEquals(outstanding, liveness.requestsOutstanding); + assertFalse(liveness.allRequestsSatisfied); + + List allocations = engine.execute(ops); + List assignments = new ArrayList<>(); + List releases = new ArrayList<>(); + appState.onContainersAllocated(allocations, assignments, releases); + assertEquals(1, assignments.size()); + ContainerAssignment assigned = assignments.get(0); + Container target = assigned.container; + RoleInstance ri = roleInstance(assigned); + assertEquals(expectedRole, ri.role); + + ops = appState.reviewRequestAndReleaseNodes(); + assertTrue(ops.isEmpty()); + + liveness = appState.getApplicationLivenessInformation(); + assertEquals(outstanding - 1, liveness.requestsOutstanding); + + //now this is the start point. + appState.containerStartSubmitted(target, ri); + + ops = appState.reviewRequestAndReleaseNodes(); + assertTrue(ops.isEmpty()); + + appState.innerOnNodeManagerContainerStarted(target.getId()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateUniqueNames.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateUniqueNames.java index 703d65f..edc1866 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateUniqueNames.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateUniqueNames.java @@ -103,8 +103,6 @@ public static void verifyInstances(List instances, String assertEquals(i, instance.componentId); assertEquals(group, instance.role); assertEquals(group, instance.providerRole.name); - assertEquals(group, instance.providerRole.group); - // TODO remove group from provider role if it continues to be unused i++; } } @@ -124,7 +122,6 @@ public void testDynamicFlexDown() throws Throwable { assertEquals(0, roleStatus.getDesired()); assertEquals(1024L, roleStatus.getResourceRequirements().getMemorySize()); assertEquals(2, roleStatus.getResourceRequirements().getVirtualCores()); - assertEquals("group1", roleStatus.getGroup()); // now flex back up appState.updateComponents(Collections.singletonMap("group1", 3L)); @@ -147,7 +144,6 @@ public void testDynamicFlexUp() throws Throwable { RoleStatus group1 = appState.lookupRoleStatus("group1"); assertEquals(3, group1.getDesired()); assertEquals(1024L, group1.getResourceRequirements().getMemorySize()); - assertEquals("group1", group1.getGroup()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.java index 7d8f5a7..555db75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.java @@ -343,7 +343,7 @@ public void testAARequestPair() throws Throwable { public void testBuildResourceRequirements() throws Throwable { // Store original values Application application = appState.getClusterStatus(); - Component role0 = application.getComponent(getRole0Status().getGroup()); + Component role0 = application.getComponent(getRole0Status().getName()); String origMem = role0.getResource().getMemory(); Integer origVcores = role0.getResource().getCpus(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/servicemonitor/TestPortProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/servicemonitor/TestPortProbe.java index 458d1bc..dacfb0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/servicemonitor/TestPortProbe.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/test/java/org/apache/slider/server/servicemonitor/TestPortProbe.java @@ -17,20 +17,25 @@ package org.apache.slider.server.servicemonitor; -import org.apache.hadoop.conf.Configuration; +import org.apache.slider.server.appmaster.model.mock.MockFactory; +import org.apache.slider.server.appmaster.state.RoleInstance; import org.junit.Assert; import org.junit.Test; public class TestPortProbe extends Assert { + private final MockFactory factory = MockFactory.INSTANCE; + /** * Assert that a port probe failed if the port is closed * @throws Throwable */ @Test public void testPortProbeFailsClosedPort() throws Throwable { - PortProbe probe = new PortProbe("127.0.0.1", 65500, 100, "", new Configuration()); + PortProbe probe = new PortProbe(65500, 100); probe.init(); - ProbeStatus status = probe.ping(true); + RoleInstance roleInstance = new RoleInstance(factory.newContainer()); + roleInstance.ip = "127.0.0.1"; + ProbeStatus status = probe.ping(roleInstance); assertFalse("Expected a failure but got successful result: " + status, status.isSuccess()); }