diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 8d014101491..fce6687c790 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink; @@ -125,6 +126,9 @@ private final Map liveInstances = new ConcurrentHashMap<>(); + private final Map checkingInstances = + new ConcurrentHashMap<>(); + private ServiceMetrics serviceMetrics; private ServiceTimelinePublisher serviceTimelinePublisher; @@ -372,6 +376,13 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { } } for (Container container : containersFromPrevAttempt) { + if (MonitorUtils.isReadinessCheckContainer(container)) { + LOG.info( + "Releasing readiness check container from previous attempt {}", + container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + continue; + } LOG.info("Handling {} from previous attempt", container.getId()); ServiceRecord record = existingRecords.remove(RegistryPathUtils .encodeYarnID(container.getId().toString())); @@ -604,7 +615,8 @@ public void handle(ComponentInstanceEvent event) { public void onContainersAllocated(List containers) { LOG.info(containers.size() + " containers allocated. "); for (Container container : containers) { - Component comp = componentsById.get(container.getAllocationRequestId()); + Component comp = componentsById.get(MonitorUtils.getBaseAllocationId( + container.getAllocationRequestId())); ComponentEvent event = new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED) .setContainer(container); @@ -612,9 +624,11 @@ public void onContainersAllocated(List containers) { try { Collection requests = amRMClient .getMatchingRequests(container.getAllocationRequestId()); - LOG.info("[COMPONENT {}]: remove {} outstanding container requests " + - "for allocateId " + container.getAllocationRequestId(), - comp.getName(), requests.size()); + LOG.info("[COMPONENT {}]: remove 1 of {} outstanding container " + + "requests for allocateId {} {} container", comp.getName(), + requests.size(), MonitorUtils.getBaseAllocationId(container + .getAllocationRequestId()), MonitorUtils + .getContainerType(container)); // remove the corresponding request if (requests.iterator().hasNext()) { AMRMClient.ContainerRequest request = requests.iterator().next(); @@ -636,6 +650,13 @@ public void onContainersReceivedFromPreviousAttempts( return; } for (Container container : containers) { + if (MonitorUtils.isReadinessCheckContainer(container)) { + LOG.info( + "Releasing readiness check container from previous attempt {}", + container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + continue; + } ComponentInstance compInstance; synchronized (unRecoveredInstances) { compInstance = unRecoveredInstances.remove(container.getId()); @@ -663,10 +684,13 @@ public void onContainersCompleted(List statuses) { ContainerId containerId = status.getContainerId(); ComponentInstance instance = liveInstances.get(status.getContainerId()); if (instance == null) { - LOG.warn( - "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ", - containerId, status.getExitStatus(), status.getDiagnostics()); - return; + instance = checkingInstances.get(status.getContainerId()); + if (instance == null) { + LOG.warn( + "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ", + containerId, status.getExitStatus(), status.getDiagnostics()); + return; + } } ComponentEvent event = new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED) @@ -726,7 +750,9 @@ public void onRequestsRejected( Map allServiceResponse) { ComponentInstance instance = liveInstances.get(containerId); if (instance == null) { - LOG.error("No component instance exists for " + containerId); + if (checkingInstances.get(containerId) == null) { + LOG.error("No component instance exists for " + containerId); + } return; } ComponentEvent event = @@ -748,7 +774,9 @@ public void onRequestsRejected( public void onStartContainerError(ContainerId containerId, Throwable t) { ComponentInstance instance = liveInstances.get(containerId); if (instance == null) { - LOG.error("No component instance exists for " + containerId); + if (checkingInstances.get(containerId) == null) { + LOG.error("No component instance exists for " + containerId); + } return; } LOG.error("Failed to start " + containerId, t); @@ -811,6 +839,15 @@ public void removeLiveCompInstance(ContainerId containerId) { liveInstances.remove(containerId); } + public void addCheckingInstance(ContainerId containerId, + ComponentInstance instance) { + checkingInstances.put(containerId, instance); + } + + public void removeCheckingInstance(ContainerId containerId) { + checkingInstances.remove(containerId); + } + public YarnRegistryViewForProviders getYarnRegistryOperations() { return yarnRegistryOperations; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java index 0665cb53fad..5dee3007b44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java @@ -61,6 +61,7 @@ @XmlEnum public enum TypeEnum { DEFAULT("DEFAULT"), + CONTAINER("CONTAINER"), HTTP("HTTP"), PORT("PORT"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 3a08eaa6fbd..d245093adec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.component; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorKeys; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.monitor.probe.Probe; import org.apache.hadoop.yarn.service.provider.ProviderUtils; @@ -59,6 +61,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; @@ -100,8 +103,11 @@ // component instances to be assigned with a container private List pendingInstances = Collections.synchronizedList(new LinkedList<>()); + private List checkingInstances = + Collections.synchronizedList(new LinkedList<>()); private ContainerFailureTracker failureTracker; private Probe probe; + public boolean hasContainerReadinessCheck = false; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; public int maxContainerFailurePerComp; @@ -185,6 +191,9 @@ public Component( componentSpec.getConfiguration(), scheduler.getConfig())) { probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck()); } + if (MonitorUtils.isContainerType(componentSpec.getReadinessCheck())) { + hasContainerReadinessCheck = true; + } maxContainerFailurePerComp = YarnServiceConf.getInt( CONTAINER_FAILURE_THRESHOLD, DEFAULT_CONTAINER_FAILURE_THRESHOLD, componentSpec.getConfiguration(), scheduler.getConfig()); @@ -272,7 +281,13 @@ public ComponentState transition(Component component, private static class ContainerAllocatedTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { - component.assignContainerToCompInstance(event.getContainer()); + if (MonitorUtils.isReadinessCheckContainer(event.getContainer())) { + component.assignContainerToCompInstance(event.getContainer(), + component.checkingInstances); + } else { + component.assignContainerToCompInstance(event.getContainer(), + component.pendingInstances); + } } } @@ -371,13 +386,42 @@ public static synchronized void checkAndUpdateComponentState( private static class ContainerCompletedTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { - component.updateMetrics(event.getStatus()); - component.dispatcher.getEventHandler().handle( - new ComponentInstanceEvent(event.getStatus().getContainerId(), - STOP).setStatus(event.getStatus())); - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); - component.getScheduler().getApp().setState(ServiceState.STARTED); + ComponentInstance instance = event.getInstance(); + ContainerStatus status = event.getStatus(); + ContainerId containerId = status.getContainerId(); + if (instance.getReadinessCheckContainer().getId().equals(containerId)) { + LOG.info("[COMPONENT {}] Readiness check container {} completed with " + + "status {}", component.getName(), containerId, status + .getExitStatus()); + instance.setReadinessCheckContainer(null); + component.getScheduler().removeCheckingInstance(containerId); + if (status.getExitStatus() == 0) { + instance.handle( + new ComponentInstanceEvent(containerId, BECOME_READY)); + } else { + component.maybeRequestReadinessCheckContainer(instance); + } + return; + } else if (instance.getContainer().getId().equals(containerId)) { + component.updateMetrics(status); + component.dispatcher.getEventHandler().handle( + new ComponentInstanceEvent(containerId, STOP).setStatus(event + .getStatus())); + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + component.getScheduler().getApp().setState(ServiceState.STARTED); + } else { + if (component.hasContainerReadinessCheck) { + LOG.error("Got completed container event for {} whose container ID " + + "does not match the instance container ID {} or readiness check" + + " container ID", instance.getContainer().getId(), instance + .getReadinessCheckContainer().getId()); + } else { + LOG.error("Got completed container event for {} whose container ID " + + "does not match the instance container ID {}", instance + .getContainer().getId()); + } + } } } @@ -403,23 +447,30 @@ private void releaseContainer(Container container) { scheduler.getServiceMetrics().surplusContainers.incr(); } - private void assignContainerToCompInstance(Container container) { - if (pendingInstances.size() == 0) { - LOG.info( - "[COMPONENT {}]: No pending component instance left, release surplus container {}", - getName(), container.getId()); + private void assignContainerToCompInstance(Container container, + List updateList) { + String type = MonitorUtils.getContainerType(container); + if (updateList.size() == 0) { + LOG.info("[COMPONENT {}]: No component instance waiting for {} " + + "container, release surplus container {}", getName(), type, + container.getId()); releaseContainer(container); return; } - ComponentInstance instance = pendingInstances.remove(0); - LOG.info( - "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}", - getName(), container.getId(), pendingInstances.size()); - instance.setContainer(container); - scheduler.addLiveCompInstance(container.getId(), instance); - LOG.info( - "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", - getName(), container.getId(), instance.getCompInstanceName(), + ComponentInstance instance = updateList.remove(0); + LOG.info("[COMPONENT {}]: {} allocated, num component instances waiting " + + "for {} container reduced to {}", getName(), container.getId(), type, + updateList.size()); + if (MonitorUtils.isReadinessCheckContainer(container)) { + instance.setReadinessCheckContainer(container); + scheduler.addCheckingInstance(container.getId(), instance); + } else { + instance.setContainer(container); + scheduler.addLiveCompInstance(container.getId(), instance); + } + LOG.info("[COMPONENT {}]: Assigned {} {} container to {} component " + + "instance {} and launch on host {} ", + getName(), container.getId(), type, instance.getCompInstanceName(), container.getNodeId()); scheduler.getContainerLaunchService() .launchCompInstance(scheduler.getApp(), instance, container); @@ -556,6 +607,41 @@ public void requestContainers(long count) { } } + public void maybeRequestReadinessCheckContainer(ComponentInstance instance) { + if (!hasContainerReadinessCheck) { + return; + } + LOG.info("[COMPONENT {}] Requesting readiness check container", + componentSpec.getName()); + org.apache.hadoop.yarn.service.api.records.Resource componentResource = + componentSpec.getResource(); + + long memory = componentResource.calcMemoryMB(); + try { + memory = Probe.getPropertyLong(componentSpec.getReadinessCheck() + .getProperties(), MonitorKeys.CONTAINER_MEMORY, memory); + } catch (IOException e) { + // ignore, should never happen + } + int cpus = componentResource.getCpus(); + try { + cpus = Probe.getPropertyInt(componentSpec.getReadinessCheck() + .getProperties(), MonitorKeys.CONTAINER_CPUS, cpus); + } catch (IOException e) { + // ignore, should never happen + } + Resource resource = Resource.newInstance(memory, cpus); + + ContainerRequest request = ContainerRequest.newBuilder() + .capability(resource).priority(priority) + .allocationRequestId(MonitorUtils.getReadinessCheckAllocationId(allocateId)) + .relaxLocality(true).build(); + LOG.info("[COMPONENT {}] Submitting readiness check container request : {}", + componentSpec.getName(), request); + amrmClient.addContainerRequest(request); + checkingInstances.add(instance); + } + private void setDesiredContainers(int n) { int delta = n - scheduler.getServiceMetrics().containersDesired.value(); if (delta > 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index c57d8882720..cb7b39f1d94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -81,6 +81,7 @@ private ComponentInstanceId compInstanceId = null; private Path compInstanceDir; private Container container; + private Container readinessCheckContainer; private YarnRegistryViewForProviders yarnRegistryOperations; private FileSystem fs; private boolean timelineServiceEnabled = false; @@ -185,6 +186,7 @@ public ComponentInstance(Component component, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { + compInstance.readinessCheckContainer = null; compInstance.containerSpec.setState(ContainerState.READY); compInstance.component.incContainersReady(); if (compInstance.timelineServiceEnabled) { @@ -322,6 +324,10 @@ public void setContainer(Container container) { this.compInstanceId.setContainerId(container.getId()); } + public void setReadinessCheckContainer(Container container) { + this.readinessCheckContainer = container; + } + public String getCompInstanceName() { return compInstanceId.getCompInstanceName(); } @@ -360,6 +366,10 @@ public Container getContainer() { return container; } + public Container getReadinessCheckContainer() { + return readinessCheckContainer; + } + public ComponentInstanceId getCompInstanceId() { return compInstanceId; } @@ -513,6 +523,7 @@ public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) { instance.compInstanceId + " IP = " + status.getIPs() + ", host = " + status.getHost() + ", cancel container status retriever"); instance.containerStatusFuture.cancel(false); + instance.component.maybeRequestReadinessCheckContainer(instance); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java index 033569cc17f..0e29fb002e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java @@ -106,6 +106,9 @@ public void run() { for (Map.Entry entry : liveInstances .entrySet()) { ComponentInstance instance = entry.getValue(); + if (instance.getComponent().hasContainerReadinessCheck) { + continue; + } ProbeStatus status = instance.ping(); if (status.isSuccess()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorKeys.java index 97770d4d2b1..e4c140c4361 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorKeys.java @@ -75,4 +75,16 @@ * Port probing default : timeout for the connection attempt {@value}. */ int WEB_PROBE_CONNECT_TIMEOUT_DEFAULT = 1000; + /** + * Container command {@value}. + */ + String CONTAINER_COMMAND = "command"; + /** + * Container memory {@value}. + */ + String CONTAINER_MEMORY = "memory"; + /** + * Container cpus {@value}. + */ + String CONTAINER_CPUS = "cpus"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorUtils.java index 0b57e6c6bf5..6539f50ff07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/MonitorUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.service.monitor.probe; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ReadinessCheck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +32,9 @@ protected static final Logger LOG = LoggerFactory.getLogger(MonitorUtils .class); + private static final long READINESS_CHECK_CONTAINER = 1 << 62; + private static final long COMPONENT_ALLOCATION_ID_MASK = (1 << 62) - 1; + private MonitorUtils() { } @@ -69,6 +73,8 @@ public static Probe getProbe(ReadinessCheck readinessCheck) { return DefaultProbe.create(readinessCheck.getProperties()); } switch (readinessCheck.getType()) { + case CONTAINER: + return null; case HTTP: return HttpProbe.create(readinessCheck.getProperties()); case PORT: @@ -81,4 +87,38 @@ public static Probe getProbe(ReadinessCheck readinessCheck) { t); } } + + public static boolean isContainerType(ReadinessCheck readinessCheck) { + if (readinessCheck == null || readinessCheck.getType() == null) { + return false; + } + return readinessCheck.getType() == ReadinessCheck.TypeEnum.CONTAINER; + } + + public static boolean isReadinessCheckContainer(Container container) { + if (isReadinessCheckContainer(container.getAllocationRequestId())) { + return true; + } + return false; + } + + public static boolean isReadinessCheckContainer(long allocationId) { + return (allocationId & READINESS_CHECK_CONTAINER) != 0; + } + + public static long getBaseAllocationId(long allocationId) { + return allocationId & COMPONENT_ALLOCATION_ID_MASK; + } + + public static long getReadinessCheckAllocationId(long allocationId) { + return allocationId | READINESS_CHECK_CONTAINER; + } + + public static String getContainerType(Container container) { + if (isReadinessCheckContainer(container)) { + return "readiness check"; + } else { + return "main"; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/Probe.java index 341a0c8f46d..decac1e0c86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/Probe.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/probe/Probe.java @@ -69,6 +69,18 @@ public static String getProperty(Map props, String name, return value; } + public static long getPropertyLong(Map props, String name, + Long defaultValue) throws IOException { + String value = props.get(name); + if (StringUtils.isEmpty(value)) { + if (defaultValue == null) { + throw new IOException(name + " not specified"); + } + return defaultValue; + } + return Long.parseLong(value); + } + public static int getPropertyInt(Map props, String name, Integer defaultValue) throws IOException { String value = props.get(name); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java index ee276866afb..32c7230e0c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java @@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorKeys; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -58,6 +60,15 @@ public abstract void processArtifact(AbstractLauncher launcher, Service service) throws IOException; + public String getLaunchCommand(Component component, Container container) { + if (MonitorUtils.isReadinessCheckContainer(container)) { + return component.getReadinessCheck().getProperties() + .get(MonitorKeys.CONTAINER_COMMAND); + } else { + return component.getLaunchCommand(); + } + } + public void buildContainerLaunchContext(AbstractLauncher launcher, Service service, ComponentInstance instance, SliderFileSystem fileSystem, Configuration yarnConf, Container container) @@ -97,7 +108,7 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, component, tokensForSubstitution, instance, context); // substitute launch command - String launchCommand = component.getLaunchCommand(); + String launchCommand = getLaunchCommand(component, container); // docker container may have empty commands if (!StringUtils.isEmpty(launchCommand)) { launchCommand = ProviderUtils diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 194ae83d680..14c2fd3874d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.conf.RestApiConstants; import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorKeys; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; +import org.apache.hadoop.yarn.service.monitor.probe.Probe; import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.ProviderFactory; import org.codehaus.jackson.map.PropertyNamingStrategy; @@ -262,6 +264,18 @@ private static void validateComponent(Component comp, FileSystem fs, .getFiles(), fs); MonitorUtils.getProbe(comp.getReadinessCheck()); + if (MonitorUtils.isContainerType(comp.getReadinessCheck())) { + if (StringUtils.isEmpty(comp.getReadinessCheck().getProperties() + .get(MonitorKeys.CONTAINER_COMMAND))) { + throw new IllegalArgumentException("Property '" + MonitorKeys + .CONTAINER_COMMAND + "' must be specified for CONTAINER readiness" + + " check"); + } + Probe.getPropertyLong(comp.getReadinessCheck().getProperties(), + MonitorKeys.CONTAINER_MEMORY, comp.getResource().calcMemoryMB()); + Probe.getPropertyInt(comp.getReadinessCheck().getProperties(), + MonitorKeys.CONTAINER_CPUS, comp.getResource().getCpus()); + } } // Check component or service name format and transform to lower case. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 599b8a7a0ae..b780539e901 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.hadoop.conf.Configuration; @@ -29,13 +31,17 @@ import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.registry.client.impl.zk.CuratorService; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -58,6 +64,7 @@ import java.net.URL; import java.nio.file.Paths; import java.util.Map; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; @@ -206,7 +213,8 @@ protected void setupInternal(int numNodeManager) if (yarnCluster == null) { yarnCluster = - new MiniYARNCluster(TestYarnNativeServices.class.getSimpleName(), 1, + new MiniYARNCluster(this.getClass().getSimpleName() + "-" + Thread + .currentThread().getStackTrace()[2].getMethodName(), 1, numNodeManager, 1, 1); yarnCluster.init(conf); yarnCluster.start(); @@ -374,4 +382,139 @@ public SliderFileSystem getFs() { return serviceBasePath; } } + + /** + * Wait until all the containers for all components become ready state. + * + * @param client + * @param exampleApp + * @return all ready containers of a service. + * @throws TimeoutException + * @throws InterruptedException + */ + protected Multimap waitForAllCompToBeReady(ServiceClient + client, Service exampleApp, int checkEveryMillis, int waitForMillis) + throws TimeoutException, InterruptedException { + return waitForExpectedNumberCompToBeReady(client, exampleApp, + countTotalContainers(exampleApp), checkEveryMillis, waitForMillis); + } + + protected Multimap waitForExpectedNumberCompToBeReady( + ServiceClient client, Service exampleApp, int expectedTotalContainers, + int checkEveryMillis, int waitForMillis) throws TimeoutException, + InterruptedException { + + Multimap allContainers = HashMultimap.create(); + + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + int totalReadyContainers = 0; + allContainers.clear(); + LOG.info("Num Components " + retrievedApp.getComponents().size()); + for (Component component : retrievedApp.getComponents()) { + LOG.info("looking for " + component.getName()); + LOG.info(component.toString()); + if (component.getContainers() != null) { + if (component.getContainers().size() == exampleApp + .getComponent(component.getName()).getNumberOfContainers()) { + for (Container container : component.getContainers()) { + LOG.info( + "Container state " + container.getState() + ", component " + + component.getName()); + if (container.getState() == ContainerState.READY) { + totalReadyContainers++; + allContainers.put(component.getName(), container.getId()); + LOG.info("Found 1 ready container " + container.getId()); + } + } + } else { + LOG.info(component.getName() + " Expected number of containers " + + exampleApp.getComponent(component.getName()) + .getNumberOfContainers() + ", current = " + component + .getContainers()); + } + } + } + LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers + + " expected = " + expectedTotalContainers); + return totalReadyContainers == expectedTotalContainers; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, checkEveryMillis, waitForMillis); + return allContainers; + } + + /** + * Wait until service state becomes stable. A service is stable when all + * requested containers of all components are running and in ready state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeStable(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeStable(client, exampleApp, 200000); + } + + protected void waitForServiceToBeStable(ServiceClient client, + Service exampleApp, int waitForMillis) + throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE, + waitForMillis); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeStarted(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); + } + + protected void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState) throws TimeoutException, + InterruptedException { + waitForServiceToBeInState(client, exampleApp, desiredState, 200000); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState, int waitForMillis) throws + TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + System.out.println(retrievedApp); + return retrievedApp.getState() == desiredState; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, waitForMillis); + } + + private int countTotalContainers(Service service) { + int totalContainers = 0; + for (Component component : service.getComponents()) { + totalContainers += component.getNumberOfContainers(); + } + return totalContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestContainerReadinessCheck.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestContainerReadinessCheck.java new file mode 100644 index 00000000000..5a838840cf4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestContainerReadinessCheck.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ReadinessCheck; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorKeys; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * A test for readiness checks of type CONTAINER. + */ +public class TestContainerReadinessCheck extends ServiceTestUtils { + + private static final Logger LOG = + LoggerFactory.getLogger(TestContainerReadinessCheck.class); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void setup() throws Exception { + File tmpYarnDir = new File("target", "tmp"); + FileUtils.deleteQuietly(tmpYarnDir); + } + + @After + public void tearDown() throws IOException { + shutdown(); + } + + @Test + public void testReadinessCheck() throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + Service exampleApp = new Service(); + exampleApp.setName("example-app"); + exampleApp.setVersion("v1"); + Component compa = createComponent("compa"); + ReadinessCheck check = new ReadinessCheck().type(ReadinessCheck.TypeEnum + .CONTAINER).putPropsItem(MonitorKeys.CONTAINER_COMMAND, "sleep 5"); + compa.setReadinessCheck(check); + exampleApp.addComponent(compa); + client.actionCreate(exampleApp); + waitForAllCompToBeReady(client, exampleApp, 2000, 200000); + } + + @Test + public void testBadReadinessCheck() throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + Service exampleApp = new Service(); + exampleApp.setName("example-app"); + exampleApp.setVersion("v1"); + Component compa = createComponent("compa"); + + // create readiness check that will succeed for compa-0 and fail for compa-1 + ReadinessCheck check = new ReadinessCheck().type(ReadinessCheck.TypeEnum + .CONTAINER).putPropsItem(MonitorKeys.CONTAINER_COMMAND, + "exit ${COMPONENT_ID}"); + compa.setReadinessCheck(check); + exampleApp.addComponent(compa); + client.actionCreate(exampleApp); + + // wait until compa has 1 ready container + waitForExpectedNumberCompToBeReady(client, exampleApp, 1, 2000, 200000); + + // wait for 60 seconds and make sure compa never gets 2 ready containers + try { + waitForAllCompToBeReady(client, exampleApp, 2000, 60000); + Assert.fail("all components became ready with bad readiness check " + + "configured"); + } catch (TimeoutException e) { + // this is expected + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 443ba0b4035..36e560e9b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.service; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; @@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Container; -import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; import org.apache.hadoop.yarn.service.api.records.PlacementPolicy; import org.apache.hadoop.yarn.service.api.records.PlacementScope; @@ -332,7 +330,7 @@ public void testRecoverComponentsAfterRMRestart() throws Exception { Service exampleApp = createExampleApplication(); client.actionCreate(exampleApp); Multimap containersBeforeFailure = - waitForAllCompToBeReady(client, exampleApp); + waitForAllCompToBeReady(client, exampleApp, 2000, 200000); LOG.info("Restart the resource manager"); getYarnCluster().restartResourceManager( @@ -362,7 +360,7 @@ public void testRecoverComponentsAfterRMRestart() throws Exception { }, 2000, 200000); Multimap containersAfterFailure = waitForAllCompToBeReady( - client, exampleApp); + client, exampleApp, 2000, 200000); Assert.assertEquals("component container affected by restart", containersBeforeFailure, containersAfterFailure); @@ -641,131 +639,4 @@ private void checkEachCompInstancesInOrder(Component component, String i++; } } - - /** - * Wait until all the containers for all components become ready state. - * - * @param client - * @param exampleApp - * @return all ready containers of a service. - * @throws TimeoutException - * @throws InterruptedException - */ - private Multimap waitForAllCompToBeReady(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - int expectedTotalContainers = countTotalContainers(exampleApp); - - Multimap allContainers = HashMultimap.create(); - - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - int totalReadyContainers = 0; - allContainers.clear(); - LOG.info("Num Components " + retrievedApp.getComponents().size()); - for (Component component : retrievedApp.getComponents()) { - LOG.info("looking for " + component.getName()); - LOG.info(component.toString()); - if (component.getContainers() != null) { - if (component.getContainers().size() == exampleApp - .getComponent(component.getName()).getNumberOfContainers()) { - for (Container container : component.getContainers()) { - LOG.info( - "Container state " + container.getState() + ", component " - + component.getName()); - if (container.getState() == ContainerState.READY) { - totalReadyContainers++; - allContainers.put(component.getName(), container.getId()); - LOG.info("Found 1 ready container " + container.getId()); - } - } - } else { - LOG.info(component.getName() + " Expected number of containers " - + exampleApp.getComponent(component.getName()) - .getNumberOfContainers() + ", current = " + component - .getContainers()); - } - } - } - LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers - + " expected = " + expectedTotalContainers); - return totalReadyContainers == expectedTotalContainers; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, 200000); - return allContainers; - } - - /** - * Wait until service state becomes stable. A service is stable when all - * requested containers of all components are running and in ready state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeStable(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - waitForServiceToBeStable(client, exampleApp, 200000); - } - - private void waitForServiceToBeStable(ServiceClient client, - Service exampleApp, int waitForMillis) - throws TimeoutException, InterruptedException { - waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE, - waitForMillis); - } - - /** - * Wait until service is started. It does not have to reach a stable state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeStarted(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); - } - - private void waitForServiceToBeInState(ServiceClient client, - Service exampleApp, ServiceState desiredState) throws TimeoutException, - InterruptedException { - waitForServiceToBeInState(client, exampleApp, desiredState, 200000); - } - - /** - * Wait until service is started. It does not have to reach a stable state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeInState(ServiceClient client, - Service exampleApp, ServiceState desiredState, int waitForMillis) throws - TimeoutException, InterruptedException { - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - System.out.println(retrievedApp); - return retrievedApp.getState() == desiredState; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, waitForMillis); - } - - private int countTotalContainers(Service service) { - int totalContainers = 0; - for (Component component : service.getComponents()) { - totalContainers += component.getNumberOfContainers(); - } - return totalContainers; - } }