diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index ec5f3ed..bae46e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; @@ -71,6 +73,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; +import org.apache.hadoop.yarn.service.utils.SliderUtils; import org.apache.hadoop.yarn.util.BoundedAppender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,11 +240,6 @@ public void serviceStop() throws Exception { serviceTimelinePublisher .serviceAttemptUnregistered(context, diagnostics.toString()); } - // Cleanup each component instance. no need to release containers as - // they will be automatically released by RM - for (ComponentInstance instance : liveInstances.values()) { - instance.cleanupRegistryAndCompHdfsDir(); - } String msg = diagnostics.toString() + "Navigate to the failed component for more details."; amRMClient @@ -266,11 +264,59 @@ public void serviceStart() throws Exception { } registerServiceInstance(context.attemptId, app); - //TODO handle containers recover - } - - private void recover() { + List recoveredContainers = response + .getContainersFromPreviousAttempts(); + Map existingRecords = new HashMap<>(); + List existingComps = null; + try { + existingComps = yarnRegistryOperations.listComponents(); + LOG.info("Registry component paths {}", existingComps); + } catch (Exception e) { + LOG.info("Could not read component paths: {}", e.getMessage()); + } + if (existingComps != null) { + for (String existingComp : existingComps) { + try { + ServiceRecord record = + yarnRegistryOperations.getComponent(existingComp); + existingRecords.put(existingComp, record); + } catch (Exception e) { + LOG.warn("Could not resolve record for component {}: {}", + existingComp, e); + } + } + } + for (Container container : recoveredContainers) { + LOG.info("Handling container {} from previous attempt", + container.getId()); + ServiceRecord record = existingRecords.get(RegistryPathUtils + .encodeYarnID(container.getId().toString())); + if (record != null) { + Component comp = componentsById.get(container.getAllocationRequestId()); + ComponentEvent event = + new ComponentEvent(comp.getName(), CONTAINER_RECOVERED) + .setContainer(container) + .setInstance(comp.getComponentInstance(record.description)); + comp.handle(event); + // do not remove requests in this case because we do not know if they + // have already been removed + } else { + LOG.info("Record not found in registry for container {} from previous" + + " attempt, releasing", container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + } + } + for (Component component : componentsById.values()) { + // Trigger initial evaluation of components + if (component.areDependenciesReady()) { + LOG.info("Triggering initial evaluation of component {}", + component.getName()); + ComponentEvent event = new ComponentEvent(component.getName(), FLEX) + .setDesired(component.getComponentSpec().getNumberOfContainers()); + component.handle(event); + } + } } private void initGlobalTokensForSubstitute(ServiceContext context) { @@ -353,7 +399,7 @@ private void registerServiceInstance(ApplicationAttemptId attemptId, executorService.submit(new Runnable() { @Override public void run() { try { - yarnRegistryOperations.registerSelf(serviceRecord, true); + yarnRegistryOperations.registerSelf(serviceRecord, false); LOG.info("Registered service under {}; absolute path {}", yarnRegistryOperations.getSelfRegistrationPath(), yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); @@ -398,13 +444,6 @@ private void createAllComponents() { componentsById.put(allocateId, component); componentsByName.put(component.getName(), component); allocateId++; - - // Trigger the component without dependencies - if (component.areDependenciesReady()) { - ComponentEvent event = new ComponentEvent(compSpec.getName(), FLEX) - .setDesired(compSpec.getNumberOfContainers()); - component.handle(event); - } } } @@ -458,17 +497,17 @@ public void onContainersAllocated(List containers) { new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED) .setContainer(container); dispatcher.getEventHandler().handle(event); + Collection requests = amRMClient + .getMatchingRequests(container.getAllocationRequestId()); LOG.info("[COMPONENT {}]: {} outstanding container requests.", - comp.getName(), - amRMClient.getMatchingRequests(container.getAllocationRequestId()).size()); + comp.getName(), requests.size()); // remove the corresponding request - Collection collection = amRMClient - .getMatchingRequests(container.getAllocationRequestId()); - if (collection.iterator().hasNext()) { - AMRMClient.ContainerRequest request = collection.iterator().next(); + if (requests.iterator().hasNext()) { + LOG.info("[COMPONENT {}]: removing one container request.", comp + .getName()); + AMRMClient.ContainerRequest request = requests.iterator().next(); amRMClient.removeContainerRequest(request); } - } } @@ -478,7 +517,7 @@ public void onContainersCompleted(List statuses) { ContainerId containerId = status.getContainerId(); ComponentInstance instance = liveInstances.get(status.getContainerId()); if (instance == null) { - LOG.error( + LOG.warn( "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ", containerId, status.getExitStatus(), status.getDiagnostics()); return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index cbaf472..72d391a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -78,7 +78,7 @@ private ServiceContext context; private AMRMClientAsync amrmClient; private AtomicLong instanceIdCounter = new AtomicLong(); - private Map compInstances = + private Map compInstances = new ConcurrentHashMap<>(); // component instances to be assigned with a container private List pendingInstances = new LinkedList<>(); @@ -101,6 +101,9 @@ // INIT will only got to FLEXING .addTransition(INIT, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) + // container recovered on AM restart + .addTransition(INIT, INIT, CONTAINER_RECOVERED, + new ContainerRecoveredTransition()) // container allocated by RM .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED, @@ -165,7 +168,7 @@ private void createOneCompInstance() { new ComponentInstanceId(instanceIdCounter.getAndIncrement(), componentSpec.getName()); ComponentInstance instance = new ComponentInstance(this, id); - compInstances.put(id, instance); + compInstances.put(instance.getCompInstanceName(), instance); pendingInstances.add(instance); } @@ -186,7 +189,7 @@ public ComponentState transition(Component component, // This happens on init LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event .getDesired() + " instances."); - component.requestContainers(event.getDesired()); + component.requestContainers(component.pendingInstances.size()); return FLEXING; } long before = component.getComponentSpec().getNumberOfContainers(); @@ -205,14 +208,14 @@ public ComponentState transition(Component component, LOG.info("[FLEX DOWN COMPONENT " + component.getName() + "]: scaling down from " + before + " to " + event.getDesired()); List list = - new ArrayList<>(component.compInstances.values()); + new ArrayList<>(component.getAllComponentInstances()); // sort in Most recent -> oldest order, destroy most recent ones. Collections.sort(list, Collections.reverseOrder()); for (int i = 0; i < delta; i++) { ComponentInstance instance = list.get(i); // remove the instance - component.compInstances.remove(instance.getCompInstanceId()); + component.compInstances.remove(instance.getCompInstanceName()); component.pendingInstances.remove(instance); component.componentMetrics.containersFailed.incr(); component.componentMetrics.containersRunning.decr(); @@ -236,6 +239,45 @@ public void transition(Component component, ComponentEvent event) { } } + private static class ContainerRecoveredTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + ComponentInstance instance = event.getInstance(); + Container container = event.getContainer(); + if (instance == null) { + LOG.info("[COMPONENT {}]: Trying to recover {} but event did not " + + "specify component instance", + component.getName(), container.getId()); + component.releaseContainer(container); + return; + } + if (instance.hasContainer()) { + LOG.info( + "[COMPONENT {}]: Instance {} already has container, release " + + "surplus container {}", + instance.getCompName(), instance.getCompInstanceId(), container + .getId()); + component.releaseContainer(container); + return; + } + component.pendingInstances.remove(instance); + LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " + + "host {}, num pending component instances reduced to {} ", + component.getName(), container.getId(), instance + .getCompInstanceName(), container.getNodeId(), component + .pendingInstances.size()); + instance.setContainer(container); + component.getScheduler().addLiveCompInstance(container.getId(), instance); + LOG.info("[COMPONENT {}]: Marking {} as started for component " + + "instance {}", component.getName(), event.getContainer().getId(), + instance.getCompInstanceId()); + component.compInstanceDispatcher.getEventHandler().handle( + new ComponentInstanceEvent(instance.getContainerId(), + START)); + component.incRunningContainers(); + } + } + private static class ContainerStartedTransition implements MultipleArcTransition { @@ -280,14 +322,18 @@ public ServiceMetrics getCompMetrics () { return componentMetrics; } + private void releaseContainer(Container container) { + scheduler.getAmRMClient().releaseAssignedContainer(container.getId()); + componentMetrics.surplusContainers.incr(); + scheduler.getServiceMetrics().surplusContainers.incr(); + } + private void assignContainerToCompInstance(Container container) { if (pendingInstances.size() == 0) { LOG.info( "[COMPONENT {}]: No pending component instance left, release surplus container {}", getName(), container.getId()); - scheduler.getAmRMClient().releaseAssignedContainer(container.getId()); - componentMetrics.surplusContainers.incr(); - scheduler.getServiceMetrics().surplusContainers.incr(); + releaseContainer(container); return; } ComponentInstance instance = pendingInstances.remove(0); @@ -397,7 +443,7 @@ public boolean areDependenciesReady() { } for (String dependency : dependencies) { Collection instances = scheduler.getAllComponents() - .get(dependency).getAllComponentInstances().values(); + .get(dependency).getAllComponentInstances(); for (ComponentInstance instance : instances) { if (instance.getContainerStatus() == null) { continue; @@ -447,8 +493,12 @@ public int getNumDesiredInstances() { return componentMetrics.containersDesired.value(); } - public Map getAllComponentInstances() { - return compInstances; + public ComponentInstance getComponentInstance(String componentInstanceName) { + return compInstances.get(componentInstanceName); + } + + public Collection getAllComponentInstances() { + return compInstances.values(); } public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 6729699..067302d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -21,6 +21,7 @@ public enum ComponentEventType { FLEX, CONTAINER_ALLOCATED, + CONTAINER_RECOVERED, CONTAINER_STARTED, CONTAINER_COMPLETED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 3c1e48f..2a642dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.text.MessageFormat; +import java.util.Collections; import java.util.Date; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -243,6 +244,8 @@ public void transition(ComponentInstance compInstance, } ExitUtil.terminate(-1); } + + compInstance.removeContainer(); } } @@ -276,6 +279,15 @@ public void handle(ComponentInstanceEvent event) { } } + public boolean hasContainer() { + return this.container != null; + } + + public void removeContainer() { + this.container = null; + this.compInstanceId.setContainerId(null); + } + public void setContainer(Container container) { this.container = container; this.compInstanceId.setContainerId(container.getId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java index 62d7a6a..2d1a8dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java @@ -48,7 +48,7 @@ private final RegistryOperations registryOperations; private final String user; - private final String sliderServiceClass; + private final String serviceClass; private final String instanceName; /** * Record used where the service registered itself. @@ -64,13 +64,13 @@ public YarnRegistryViewForProviders(RegistryOperations registryOperations, String user, - String sliderServiceClass, + String serviceClass, String instanceName, ApplicationAttemptId applicationAttemptId) { Preconditions.checkArgument(registryOperations != null, "null registry operations"); Preconditions.checkArgument(user != null, "null user"); - Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass), + Preconditions.checkArgument(SliderUtils.isSet(serviceClass), "unset service class"); Preconditions.checkArgument(SliderUtils.isSet(instanceName), "instanceName"); @@ -78,7 +78,7 @@ public YarnRegistryViewForProviders(RegistryOperations registryOperations, "null applicationAttemptId"); this.registryOperations = registryOperations; this.user = user; - this.sliderServiceClass = sliderServiceClass; + this.serviceClass = serviceClass; this.instanceName = instanceName; } @@ -125,7 +125,7 @@ public String getAbsoluteSelfRegistrationPath() { public void putComponent(String componentName, ServiceRecord record) throws IOException { - putComponent(sliderServiceClass, instanceName, + putComponent(serviceClass, instanceName, componentName, record); } @@ -146,7 +146,31 @@ public void putComponent(String serviceClass, registryOperations.mknode(RegistryPathUtils.parentOf(path), true); registryOperations.bind(path, record, BindFlags.OVERWRITE); } - + + /** + * Get a component + * @param componentName component name + * @return the service record + * @throws IOException + */ + public ServiceRecord getComponent(String componentName) throws IOException { + String path = RegistryUtils.componentPath( + user, serviceClass, instanceName, componentName); + LOG.info("Resolving path {}", path); + return registryOperations.resolve(path); + } + + /** + * List components + * @return a list of components + * @throws IOException + */ + public List listComponents() throws IOException { + String path = RegistryUtils.componentListPath( + user, serviceClass, instanceName); + return registryOperations.list(path); + } + /** * Add a service under a path, optionally purging any history * @param username user @@ -183,7 +207,7 @@ public String registerSelf( ServiceRecord record, boolean deleteTreeFirst) throws IOException { selfRegistrationPath = - putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst); + putService(user, serviceClass, instanceName, record, deleteTreeFirst); setSelfRegistration(record); return selfRegistrationPath; } @@ -196,7 +220,7 @@ public String registerSelf( public void deleteComponent(ComponentInstanceId instanceId, String containerId) throws IOException { String path = RegistryUtils.componentPath( - user, sliderServiceClass, instanceName, + user, serviceClass, instanceName, containerId); LOG.info(instanceId + ": Deleting registry path " + path); registryOperations.delete(path, false);