diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 5440980..2f4fa5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -763,7 +763,7 @@ public void addResource(LocalizerResourceRequestEvent request) { */ if (rsrc.tryAcquire()) { - if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { + if (rsrc.getState() == ResourceState.DOWNLOADING) { LocalResource resource = request.getResource().getRequest(); try { Path publicRootPath = @@ -895,7 +895,7 @@ private LocalResource findNextResource() { LocalizedResource nRsrc = evt.getResource(); // Resource download should take place ONLY if resource is in // Downloading state - if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { + if (nRsrc.getState() != ResourceState.DOWNLOADING) { i.remove(); continue; } @@ -906,7 +906,7 @@ private LocalResource findNextResource() { * 2) Resource is still in DOWNLOADING state */ if (nRsrc.tryAcquire()) { - if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + if (nRsrc.getState() == ResourceState.DOWNLOADING) { LocalResourceRequest nextRsrc = nRsrc.getRequest(); LocalResource next = recordFactory.newRecordInstance(LocalResource.class); @@ -936,41 +936,9 @@ LocalizerHeartbeatResponse update( String user = context.getUser(); ApplicationId applicationId = context.getContainerId().getApplicationAttemptId().getApplicationId(); - // The localizer has just spawned. Start giving it resources for - // remote-fetching. - if (remoteResourceStatuses.isEmpty()) { - LocalResource next = findNextResource(); - if (next != null) { - response.setLocalizerAction(LocalizerAction.LIVE); - try { - ArrayList rsrcs = - new ArrayList(); - ResourceLocalizationSpec rsrc = - NodeManagerBuilderUtils.newResourceLocalizationSpec(next, - getPathForLocalization(next)); - rsrcs.add(rsrc); - response.setResourceSpecs(rsrcs); - } catch (IOException e) { - LOG.error("local path for PRIVATE localization could not be found." - + "Disks might have failed.", e); - } catch (URISyntaxException e) { - // TODO fail? Already translated several times... - } - } else if (pending.isEmpty()) { - // TODO: Synchronization - response.setLocalizerAction(LocalizerAction.DIE); - } else { - response.setLocalizerAction(LocalizerAction.LIVE); - } - return response; - } - ArrayList rsrcs = - new ArrayList(); - /* - * TODO : It doesn't support multiple downloads per ContainerLocalizer - * at the same time. We need to think whether we should support this. - */ + LocalizerAction action = LocalizerAction.LIVE; + // Update resource statuses. for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); LocalResourceRequest req = null; @@ -999,30 +967,8 @@ LocalizerHeartbeatResponse update( // list assoc.getResource().unlock(); scheduled.remove(req); - - if (pending.isEmpty()) { - // TODO: Synchronization - response.setLocalizerAction(LocalizerAction.DIE); - break; - } - response.setLocalizerAction(LocalizerAction.LIVE); - LocalResource next = findNextResource(); - if (next != null) { - try { - ResourceLocalizationSpec resource = - NodeManagerBuilderUtils.newResourceLocalizationSpec(next, - getPathForLocalization(next)); - rsrcs.add(resource); - } catch (IOException e) { - LOG.error("local path for PRIVATE localization could not be " + - "found. Disks might have failed.", e); - } catch (URISyntaxException e) { - //TODO fail? Already translated several times... - } - } break; case FETCH_PENDING: - response.setLocalizerAction(LocalizerAction.LIVE); break; case FETCH_FAILURE: final String diagnostics = stat.getException().toString(); @@ -1036,17 +982,48 @@ LocalizerHeartbeatResponse update( // list assoc.getResource().unlock(); scheduled.remove(req); - break; default: LOG.info("Unknown status: " + stat.getStatus()); - response.setLocalizerAction(LocalizerAction.DIE); + action = LocalizerAction.DIE; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, stat.getException().getMessage())); break; } } + if (action == LocalizerAction.DIE) { + response.setLocalizerAction(action); + return response; + } + + // Give the localizer resources for remote-fetching. + List rsrcs = + new ArrayList(); + + /* + * TODO : It doesn't support multiple downloads per ContainerLocalizer + * at the same time. We need to think whether we should support this. + */ + LocalResource next = findNextResource(); + if (next != null) { + try { + ResourceLocalizationSpec resource = + NodeManagerBuilderUtils.newResourceLocalizationSpec(next, + getPathForLocalization(next)); + rsrcs.add(resource); + } catch (IOException e) { + LOG.error("local path for PRIVATE localization could not be " + + "found. Disks might have failed.", e); + } catch (URISyntaxException e) { + //TODO fail? Already translated several times... + } + } else if (pending.isEmpty()) { + // TODO: Synchronization + action = LocalizerAction.DIE; + } + + response.setLocalizerAction(action); response.setResourceSpecs(rsrcs); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index f968bb9..9ed18dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -827,10 +827,16 @@ public boolean matches(Object o) { do { resource2 = getPrivateMockedResource(r); } while (resource2 == null || resource2.equals(resource1)); + LocalResource resource3 = null; + do { + resource3 = getPrivateMockedResource(r); + } while (resource3 == null || resource3.equals(resource1) + || resource3.equals(resource2)); // above call to make sure we don't get identical resources. final LocalResourceRequest req1 = new LocalResourceRequest(resource1); final LocalResourceRequest req2 = new LocalResourceRequest(resource2); + final LocalResourceRequest req3 = new LocalResourceRequest(resource3); Map> rsrcs = new HashMap>(); @@ -838,6 +844,7 @@ public boolean matches(Object o) { new ArrayList(); privateResourceList.add(req1); privateResourceList.add(req2); + privateResourceList.add(req3); rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); // Sigh. Thread init of private localizer not accessible @@ -852,30 +859,47 @@ public boolean matches(Object o) { Path localizationTokenPath = tokenPathCaptor.getValue(); // heartbeat from localizer - LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class); - LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class); + LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class); + LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class); + LocalResourceStatus rsrc2success = mock(LocalResourceStatus.class); + LocalResourceStatus rsrc3success = mock(LocalResourceStatus.class); LocalizerStatus stat = mock(LocalizerStatus.class); when(stat.getLocalizerId()).thenReturn(ctnrStr); - when(rsrcStat1.getResource()).thenReturn(resource1); - when(rsrcStat2.getResource()).thenReturn(resource2); - when(rsrcStat1.getLocalSize()).thenReturn(4344L); - when(rsrcStat2.getLocalSize()).thenReturn(2342L); + when(rsrc1success.getResource()).thenReturn(resource1); + when(rsrc2pending.getResource()).thenReturn(resource2); + when(rsrc2success.getResource()).thenReturn(resource2); + when(rsrc3success.getResource()).thenReturn(resource3); + when(rsrc1success.getLocalSize()).thenReturn(4344L); + when(rsrc2success.getLocalSize()).thenReturn(2342L); + when(rsrc3success.getLocalSize()).thenReturn(5345L); URL locPath = getPath("/cache/private/blah"); - when(rsrcStat1.getLocalPath()).thenReturn(locPath); - when(rsrcStat2.getLocalPath()).thenReturn(locPath); - when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); - when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrc1success.getLocalPath()).thenReturn(locPath); + when(rsrc2success.getLocalPath()).thenReturn(locPath); + when(rsrc3success.getLocalPath()).thenReturn(locPath); + when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING); + when(rsrc2success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrc3success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + + // Four heartbeats with sending: + // 1 - empty + // 2 - resource1 FETCH_SUCCESS + // 3 - resource2 FETCH_PENDING + // 4 - resource2 FETCH_SUCCESS, resource3 FETCH_SUCCESS + List rsrcs4 = new ArrayList(); + rsrcs4.add(rsrc2success); + rsrcs4.add(rsrc3success); when(stat.getResources()) .thenReturn(Collections.emptyList()) - .thenReturn(Collections.singletonList(rsrcStat1)) - .thenReturn(Collections.singletonList(rsrcStat2)) - .thenReturn(Collections.emptyList()); + .thenReturn(Collections.singletonList(rsrc1success)) + .thenReturn(Collections.singletonList(rsrc2pending)) + .thenReturn(rsrcs4); String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR + "user0" + Path.SEPARATOR + ContainerLocalizer.FILECACHE; - - // get first resource + + // First heartbeat LocalizerHeartbeatResponse response = spyService.heartbeat(stat); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); assertEquals(1, response.getResourceSpecs().size()); @@ -888,7 +912,7 @@ public boolean matches(Object o) { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "10")); - // get second resource + // Second heartbeat response = spyService.heartbeat(stat); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); assertEquals(1, response.getResourceSpecs().size()); @@ -902,16 +926,21 @@ public boolean matches(Object o) { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11")); - // empty rsrc + // Third heartbeat response = spyService.heartbeat(stat); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - assertEquals(0, response.getResourceSpecs().size()); + assertEquals(1, response.getResourceSpecs().size()); + assertEquals(req3, new LocalResourceRequest(response.getResourceSpecs() + .get(0).getResource())); + localizedPath = + response.getResourceSpecs().get(0).getDestinationDirectory(); + assertTrue(localizedPath.getFile().endsWith( + localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12")); // get shutdown response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); - dispatcher.await(); // verify container notification ArgumentMatcher matchesContainerLoc = @@ -923,8 +952,8 @@ public boolean matches(Object o) { && c.getContainerId() == evt.getContainerID(); } }; - // total 2 resource localzation calls. one for each resource. - verify(containerBus, times(2)).handle(argThat(matchesContainerLoc)); + // total 3 resource localzation calls. one for each resource. + verify(containerBus, times(3)).handle(argThat(matchesContainerLoc)); // Verify deletion of localization token. verify(delService).delete((String)isNull(), eq(localizationTokenPath));