diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c9874a6..7581255d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -716,6 +718,10 @@ public ContainerState transition(ContainerImpl container, return ContainerState.LOCALIZING; } + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationEvent(LocalizationEventType. + CONTAINER_RESOURCES_LOCALIZED, container)); + container.sendLaunchEvent(); // If this is a recovered container that has already launched, skip diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4236392..a67fb5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -389,6 +391,9 @@ public void handle(LocalizationEvent event) { case INIT_CONTAINER_RESOURCES: handleInitContainerResources((ContainerLocalizationRequestEvent) event); break; + case CONTAINER_RESOURCES_LOCALIZED: + handleContainerResourcesLocalized((ContainerLocalizationEvent) event); + break; case CACHE_CLEANUP: handleCacheCleanup(event); break; @@ -455,7 +460,18 @@ private void handleInitContainerResources( } } } - + + /** + * kill ContainerLocalizer when Container gets all resources localized and + * is ready to be launched. + */ + private void handleContainerResourcesLocalized( + ContainerLocalizationEvent event) { + Container c = event.getContainer(); + String locId = ConverterUtils.toString(c.getContainerId()); + localizerTracker.endContainerLocalization(locId); + } + private void handleCacheCleanup(LocalizationEvent event) { ResourceRetentionSet retain = new ResourceRetentionSet(delService, cacheTargetSize); @@ -724,6 +740,17 @@ public void cleanupPrivLocalizers(String locId) { localizer.interrupt(); } } + + public void endContainerLocalization(String locId) { + LocalizerRunner localizer; + synchronized (privLocalizers) { + localizer = privLocalizers.get(locId); + if (null == localizer) { + return; // ignore + } + } + localizer.endContainerLocalization(); + } } @@ -878,6 +905,7 @@ public void run() { final Map scheduled; // Its a shared list between Private Localizer and dispatcher thread. final List pending; + private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false); // TODO: threadsafe, use outer? private final RecordFactory recordFactory = @@ -898,6 +926,10 @@ public void addResource(LocalizerResourceRequestEvent request) { pending.add(request); } + public void endContainerLocalization() { + killContainerLocalizer.set(true); + } + /** * Find next resource to be given to a spawned localizer. * @@ -1008,8 +1040,8 @@ LocalizerHeartbeatResponse update( break; } } - if (action == LocalizerAction.DIE) { - response.setLocalizerAction(action); + if (action == LocalizerAction.DIE || killContainerLocalizer.get()) { + response.setLocalizerAction(LocalizerAction.DIE); return response; } @@ -1037,9 +1069,6 @@ LocalizerHeartbeatResponse update( } catch (URISyntaxException e) { //TODO fail? Already translated several times... } - } else if (pending.isEmpty()) { - // TODO: Synchronization - action = LocalizerAction.DIE; } response.setLocalizerAction(action); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java index 5134349..4785fba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java @@ -23,4 +23,5 @@ CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, + CONTAINER_RESOURCES_LOCALIZED, } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index d3c3521..2edaf45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -975,7 +976,8 @@ public boolean matches(Object o) { .thenReturn(Collections.emptyList()) .thenReturn(Collections.singletonList(rsrc1success)) .thenReturn(Collections.singletonList(rsrc2pending)) - .thenReturn(rsrcs4); + .thenReturn(rsrcs4) + .thenReturn(Collections.emptyList()); String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR + "user0" + Path.SEPARATOR + @@ -1019,7 +1021,13 @@ public boolean matches(Object o) { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12")); - // get shutdown + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + spyService.handle(new ContainerLocalizationEvent( + LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c)); + + // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());