diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 5440980..cf9e9db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -763,7 +763,7 @@ public void addResource(LocalizerResourceRequestEvent request) { */ if (rsrc.tryAcquire()) { - if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { + if (rsrc.getState() == ResourceState.DOWNLOADING) { LocalResource resource = request.getResource().getRequest(); try { Path publicRootPath = @@ -895,7 +895,7 @@ private LocalResource findNextResource() { LocalizedResource nRsrc = evt.getResource(); // Resource download should take place ONLY if resource is in // Downloading state - if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { + if (nRsrc.getState() != ResourceState.DOWNLOADING) { i.remove(); continue; } @@ -906,7 +906,7 @@ private LocalResource findNextResource() { * 2) Resource is still in DOWNLOADING state */ if (nRsrc.tryAcquire()) { - if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + if (nRsrc.getState() == ResourceState.DOWNLOADING) { LocalResourceRequest nextRsrc = nRsrc.getRequest(); LocalResource next = recordFactory.newRecordInstance(LocalResource.class); @@ -936,41 +936,9 @@ LocalizerHeartbeatResponse update( String user = context.getUser(); ApplicationId applicationId = context.getContainerId().getApplicationAttemptId().getApplicationId(); - // The localizer has just spawned. Start giving it resources for - // remote-fetching. - if (remoteResourceStatuses.isEmpty()) { - LocalResource next = findNextResource(); - if (next != null) { - response.setLocalizerAction(LocalizerAction.LIVE); - try { - ArrayList rsrcs = - new ArrayList(); - ResourceLocalizationSpec rsrc = - NodeManagerBuilderUtils.newResourceLocalizationSpec(next, - getPathForLocalization(next)); - rsrcs.add(rsrc); - response.setResourceSpecs(rsrcs); - } catch (IOException e) { - LOG.error("local path for PRIVATE localization could not be found." - + "Disks might have failed.", e); - } catch (URISyntaxException e) { - // TODO fail? Already translated several times... - } - } else if (pending.isEmpty()) { - // TODO: Synchronization - response.setLocalizerAction(LocalizerAction.DIE); - } else { - response.setLocalizerAction(LocalizerAction.LIVE); - } - return response; - } - ArrayList rsrcs = - new ArrayList(); - /* - * TODO : It doesn't support multiple downloads per ContainerLocalizer - * at the same time. We need to think whether we should support this. - */ + LocalizerAction action = LocalizerAction.LIVE; + // Update resource statuses. for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); LocalResourceRequest req = null; @@ -999,30 +967,8 @@ LocalizerHeartbeatResponse update( // list assoc.getResource().unlock(); scheduled.remove(req); - - if (pending.isEmpty()) { - // TODO: Synchronization - response.setLocalizerAction(LocalizerAction.DIE); - break; - } - response.setLocalizerAction(LocalizerAction.LIVE); - LocalResource next = findNextResource(); - if (next != null) { - try { - ResourceLocalizationSpec resource = - NodeManagerBuilderUtils.newResourceLocalizationSpec(next, - getPathForLocalization(next)); - rsrcs.add(resource); - } catch (IOException e) { - LOG.error("local path for PRIVATE localization could not be " + - "found. Disks might have failed.", e); - } catch (URISyntaxException e) { - //TODO fail? Already translated several times... - } - } break; case FETCH_PENDING: - response.setLocalizerAction(LocalizerAction.LIVE); break; case FETCH_FAILURE: final String diagnostics = stat.getException().toString(); @@ -1036,17 +982,48 @@ LocalizerHeartbeatResponse update( // list assoc.getResource().unlock(); scheduled.remove(req); - break; default: LOG.info("Unknown status: " + stat.getStatus()); - response.setLocalizerAction(LocalizerAction.DIE); + action = LocalizerAction.DIE; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, stat.getException().getMessage())); break; } } + if (action == LocalizerAction.DIE) { + response.setLocalizerAction(action); + return response; + } + + // Give the localizer resources for remote-fetching. + List rsrcs = + new ArrayList(); + + /* + * TODO : It doesn't support multiple downloads per ContainerLocalizer + * at the same time. We need to think whether we should support this. + */ + LocalResource next = findNextResource(); + if (next != null) { + try { + ResourceLocalizationSpec resource = + NodeManagerBuilderUtils.newResourceLocalizationSpec(next, + getPathForLocalization(next)); + rsrcs.add(resource); + } catch (IOException e) { + LOG.error("local path for PRIVATE localization could not be " + + "found. Disks might have failed.", e); + } catch (URISyntaxException e) { + //TODO fail? Already translated several times... + } + } else if (pending.isEmpty()) { + // TODO: Synchronization + action = LocalizerAction.DIE; + } + + response.setLocalizerAction(action); response.setResourceSpecs(rsrcs); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index f968bb9..1cdaf3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -902,11 +902,6 @@ public boolean matches(Object o) { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11")); - // empty rsrc - response = spyService.heartbeat(stat); - assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - assertEquals(0, response.getResourceSpecs().size()); - // get shutdown response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());