diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 940c599..167ea79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -19,6 +19,9 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.BasicFileAttributes; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -146,7 +149,7 @@ public synchronized void handle(ResourceEvent event) { } break; case REQUEST: - if (rsrc != null && (!isResourcePresent(rsrc))) { + if (rsrc != null && (!isResourcePresent(rsrc, req))) { LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); removeResource(req); @@ -338,12 +341,26 @@ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, * @param rsrc * @return true/false based on resource is present or not */ - public boolean isResourcePresent(LocalizedResource rsrc) { + boolean isResourcePresent(LocalizedResource rsrc, LocalResourceRequest req) { boolean ret = true; if (rsrc.getState() == ResourceState.LOCALIZED) { - File file = new File(rsrc.getLocalPath().toUri().getRawPath(). - toString()); - if (!file.exists()) { + BasicFileAttributes attributes = null; + File file = new File(rsrc.getLocalPath().toUri().getRawPath()); + try { + attributes = + Files.readAttributes(file.toPath(), BasicFileAttributes.class); + } catch (NoSuchFileException nsfe) { + LOG.info("Resource "+ rsrc.getLocalPath() + " was not found!"); + return false; + } catch (IOException e) { + LOG.info("Encountered IOException while checking if the resource " + + rsrc.toString() + " is present. ", e); + return false; + } + if (!attributes.isDirectory() && attributes.size() != req.getSize()) { + LOG.info("Resource "+ rsrc.getLocalPath() + + " encountered a size mismatch, expected size=" + req.getSize() + + " actual size=" + attributes.size()); ret = false; } else if (dirsHandler != null) { ret = checkLocalResource(rsrc); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 8d9705b..788ecaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.URL; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -28,7 +33,10 @@ import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -900,6 +908,113 @@ public void testResourcePresentInGoodDir() throws IOException { } @Test + public void testIsResourcePresentWithSizeMatch() + throws IOException, URISyntaxException { + String user = "testuser"; + DrainDispatcher dispatcher = null; + FileContext lfs = FileContext.getLocalFSFileContext(); + Path localPath1 = null; + Path localPath2 = null; + try { + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + LocalResourceRequest req1 = + createLocalResourceRequestWithSize(user, 1, 1, + LocalResourceVisibility.PUBLIC); + LocalResourceRequest req2 = + createLocalResourceRequestWithSize(user, 2, 1, + LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + LocalizedResource lr2 = createLocalizedResource(req2, dispatcher); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + localrsrc.put(req2, lr2); + LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); + List goodDirs = new ArrayList(); + goodDirs.add("/tmp/somedir1/"); + goodDirs.add("/tmp/somedir2"); + Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs); + Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs); + LocalResourcesTrackerImpl tracker = + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + true, conf, new NMNullStateStoreService(), dirsHandler); + ResourceEvent req11Event = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); + ResourceEvent req21Event = + new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1); + // Localize R1 for C1 + tracker.handle(req11Event); + // Localize R2 for C1 + tracker.handle(req21Event); + dispatcher.await(); + // Localize resource1 + Path p1 = tracker.getPathForLocalization(req1, + new Path("/tmp/somedir1"), null); + Path p2 = tracker.getPathForLocalization(req2, + new Path("/tmp/somedir2"), null); + ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1); + tracker.handle(rle1); + ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); + tracker.handle(rle2); + localPath1 = rle1.getLocation(); + localPath2 = rle2.getLocation(); + //first resource is a file initially of size 0. + FSDataOutputStream outputStream = lfs.create(localPath1, + EnumSet.of(CreateFlag.CREATE), Options + .CreateOpts.createParent()); + //second resource is a directory with no contents + lfs.mkdir(localPath2, FsPermission.getDefault(), true); + dispatcher.await(); + //file size zero and should be 3B, must return false + boolean isPresent = tracker.isResourcePresent(lr1, req1); + Assert.assertFalse( + "file size zero and should be 3, must return false", isPresent); + + outputStream.write(new byte[]{1, 2, 3}); + outputStream.hflush(); + outputStream.close(); + //file size is 3B and should be 3B, must return false + isPresent = tracker.isResourcePresent(lr1, req1); + Assert.assertTrue( + "file size is 3B and should be 3B, must return false", isPresent); + + lfs.delete(localPath1, false); + //file does not exist, must return false + isPresent = tracker.isResourcePresent(lr1, req1); + Assert.assertFalse("file does not exist, should return false", isPresent); + + //directory is present and size may not match, must return true + isPresent = tracker.isResourcePresent(lr2, req2); + Assert.assertTrue(isPresent); + + lfs.delete(localPath2, false); + //directory does not exist and must return false + isPresent = tracker.isResourcePresent(lr2, req2); + Assert.assertFalse(isPresent); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + if (localPath1 != null) { + lfs.delete(localPath1.getParent(), true); + } + if (localPath2 != null) { + lfs.delete(localPath2.getParent(), true); + } + } + } + + @Test @SuppressWarnings("unchecked") public void testReleaseWhileDownloading() throws Exception { String user = "testuser"; @@ -980,6 +1095,19 @@ private LocalResourceRequest createLocalResourceRequest(String user, int i, return req; } + private LocalResourceRequest createLocalResourceRequestWithSize( + String user, int i, long ts, LocalResourceVisibility vis) + throws MalformedURLException, URISyntaxException { + LocalResourceRequest mockReq = Mockito.mock(LocalResourceRequest.class); + Path p = new Path("file:///tmp/" + user + "/rsrc" + i); + Mockito.when(mockReq.getResource()).thenReturn(URL.fromPath(p)); + Mockito.when(mockReq.getSize()).thenReturn(3L); + Mockito.when(mockReq.getPath()).thenReturn(p); + Mockito.when(mockReq.getTimestamp()).thenReturn(ts); + Mockito.when(mockReq.getVisibility()).thenReturn(vis); + return mockReq; + } + private LocalizedResource createLocalizedResource(LocalResourceRequest req, Dispatcher dispatcher) { LocalizedResource lr = new LocalizedResource(req, dispatcher);