diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 12a307930fc..5a5a8d782db 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -377,6 +377,7 @@ private LocalResource createApplicationResource(FileContext fs, Path p, } rsrc.setResource(URL.fromURI(uriWithFragment)); rsrc.setSize(rsrcStat.getLen()); + rsrc.setDownloadSize(rsrcStat.getLen()); rsrc.setTimestamp(rsrcStat.getModificationTime()); rsrc.setType(type); rsrc.setVisibility(viz); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java index e243e9051fb..b21f27567a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java @@ -44,6 +44,8 @@ @Stable public abstract class LocalResource { + private long downloadSize; + @Public @Stable public static LocalResource newInstance(URL url, LocalResourceType type, @@ -62,6 +64,7 @@ public static LocalResource newInstance(URL url, LocalResourceType type, resource.setType(type); resource.setVisibility(visibility); resource.setSize(size); + resource.setDownloadSize(size); resource.setTimestamp(timestamp); resource.setPattern(pattern); resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); @@ -124,7 +127,12 @@ public static LocalResource newInstance(URL url, LocalResourceType type, @Public @Stable public abstract long getTimestamp(); - + + @Public + @Unstable + public long getDownloadSize() { + return downloadSize; + } /** * Set the timestamp of the resource to be localized, used * for verification. @@ -211,4 +219,10 @@ public static LocalResource newInstance(URL url, LocalResourceType type, @Unstable public abstract void setShouldBeUploadedToSharedCache( boolean shouldBeUploadedToSharedCache); + + @Public + @Unstable + public void setDownloadSize(long actualSize) { + this.downloadSize = actualSize; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index fdbe2d45bf5..e4dd62802b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -230,6 +230,7 @@ message LocalResourceProto { optional LocalResourceVisibilityProto visibility = 5; optional string pattern = 6; optional bool should_be_uploaded_to_shared_cache = 7; + optional int64 downloadSize = 8 [default = -1]; } message StringLongMapProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java index 560b081c016..b273055341f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java @@ -212,6 +212,18 @@ public synchronized void setShouldBeUploadedToSharedCache( builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); } + @Override + public synchronized long getDownloadSize() { + LocalResourceProtoOrBuilder p = viaProto ? proto : builder; + return (p.getDownloadSize()); + } + + @Override + public synchronized void setDownloadSize(long size) { + maybeInitBuilder(); + builder.setDownloadSize(size); + } + private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) { return ProtoUtils.convertToProtoFormat(e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 83f912f9138..109c6343f79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -111,6 +111,7 @@ public static LocalResource newLocalResource(URL url, LocalResourceType type, resource.setType(type); resource.setVisibility(visibility); resource.setSize(size); + resource.setDownloadSize(size); resource.setTimestamp(timestamp); resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); return resource; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java index d2e8e22d459..f624a8d2b4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java @@ -35,6 +35,7 @@ private final LocalResourceType type; private final LocalResourceVisibility visibility; private final String pattern; + private long downloadSize; /** * Wrap API resource to match against cache of localized resources. @@ -48,6 +49,7 @@ public LocalResourceRequest(LocalResource resource) resource.getType(), resource.getVisibility(), resource.getPattern()); + this.downloadSize = resource.getDownloadSize(); } LocalResourceRequest(Path loc, long timestamp, LocalResourceType type, @@ -142,6 +144,11 @@ public long getSize() { } @Override + public long getDownloadSize() { + return downloadSize; + } + + @Override public LocalResourceVisibility getVisibility() { return visibility; } @@ -173,6 +180,11 @@ public void setSize(long size) { } @Override + public void setDownloadSize(long size) { + this.downloadSize = size; + } + + @Override public void setTimestamp(long timestamp) { throw new UnsupportedOperationException(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index dd315431a13..515b0ff21cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -19,6 +19,9 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.BasicFileAttributes; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -328,16 +331,32 @@ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, /** * This module checks if the resource which was localized is already present * or not - * * @param rsrc * @return true/false based on resource is present or not */ - public boolean isResourcePresent(LocalizedResource rsrc) { + boolean isResourcePresent(LocalizedResource rsrc) { boolean ret = true; + BasicFileAttributes attributes = null; if (rsrc.getState() == ResourceState.LOCALIZED) { - File file = new File(rsrc.getLocalPath().toUri().getRawPath(). - toString()); - if (!file.exists()) { + File file = new File(rsrc.getLocalPath().toString()); + try { + attributes = + Files.readAttributes(file.toPath(), BasicFileAttributes.class); + } catch (NoSuchFileException nsfe) { + LOG.info("Resource "+ rsrc.getLocalPath() + " was not found!"); + return false; + } catch (IOException e) { + ret = false; + LOG.info("Encountered IOException while checking if the resource " + + rsrc.toString() + " is present. ", e); + return ret; + } + if (!attributes.isSymbolicLink() && !attributes.isDirectory() + && rsrc.getDownloadSize() != -1 + && attributes.size() != rsrc.getDownloadSize()) { + LOG.info("Resource "+ rsrc.getLocalPath() + + " encountered a size mismatch, expected size=" + + rsrc.getDownloadSize() + " actual size=" + attributes.size()); ret = false; } else if (dirsHandler != null) { ret = checkLocalResource(rsrc); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index bd9602e4ddc..ef63662f88a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -58,6 +58,7 @@ volatile Path localPath; volatile long size = -1; + private volatile long downloadSize = -1; final LocalResourceRequest rsrc; final Dispatcher dispatcher; final StateMachine @@ -173,10 +174,17 @@ public long getSize() { return size; } + public long getDownloadSize() { + return downloadSize; + } public int getRefCount() { return ref.size(); } + public void setDownloadSize(long downloadSize) { + this.downloadSize = downloadSize; + } + public boolean tryAcquire() { return sem.tryAcquire(); } @@ -231,6 +239,7 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { LocalizerContext ctxt = req.getContext(); ContainerId container = ctxt.getContainerId(); rsrc.ref.add(container); + rsrc.setDownloadSize(rsrc.getRequest().getDownloadSize()); rsrc.dispatcher.getEventHandler().handle( new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt, req.getLocalResourceRequest().getPattern())); @@ -248,6 +257,7 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { rsrc.localPath = Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation()); rsrc.size = locEvent.getSize(); + rsrc.setDownloadSize(rsrc.getRequest().getDownloadSize()); for (ContainerId container : rsrc.ref) { rsrc.dispatcher.getEventHandler().handle( new ContainerResourceLocalizedEvent( @@ -310,6 +320,7 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event; rsrc.localPath = recoveredEvent.getLocalPath(); rsrc.size = recoveredEvent.getSize(); + rsrc.setDownloadSize(rsrc.getRequest().getDownloadSize()); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 17aa7d9f62a..c7a53634459 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1048,6 +1048,7 @@ private LocalResource findNextResource() { next.setType(nextRsrc.getType()); next.setVisibility(evt.getVisibility()); next.setPattern(evt.getPattern()); + next.setDownloadSize(evt.getResource().getSize()); scheduled.put(nextRsrc, evt); return next; } else { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 6cab5934e89..424dc79b713 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.permission.FsPermission; import static org.mockito.Matchers.any; import static org.mockito.Mockito.argThat; import static org.mockito.Matchers.eq; @@ -29,7 +33,9 @@ import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -902,6 +908,121 @@ public void testResourcePresentInGoodDir() throws IOException { } } + @Test (timeout = 120000) + public void testIsResourcePresentWithSizeMatch() + throws IOException, URISyntaxException { + String user = "testuser"; + DrainDispatcher dispatcher = null; + FileContext lfs = FileContext.getLocalFSFileContext(); + Path localPath1 = null; + Path localPath2 = null; + LocalResourceRequest req1 = null; + LocalResourceRequest req2 = null; + try { + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + req1 = + createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + req2 = + createLocalResourceRequest(user, 2, 1, + LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + LocalizedResource lr2 = createLocalizedResource(req2, dispatcher); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + localrsrc.put(req2, lr2); + LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); + List goodDirs = new ArrayList(); + goodDirs.add("/tmp/somedir1/"); + goodDirs.add("/tmp/somedir2"); + Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs); + Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs); + LocalResourcesTrackerImpl tracker = + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + true, conf, new NMNullStateStoreService(), dirsHandler); + ResourceEvent req11Event = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); + ResourceEvent req21Event = + new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1); + // Localize R1 for C1 + tracker.handle(req11Event); + // Localize R2 for C1 + tracker.handle(req21Event); + dispatcher.await(); + // Localize resource1 + Path p1 = tracker.getPathForLocalization(req1, + new Path("/tmp/somedir1"), null); + Path p2 = tracker.getPathForLocalization(req2, + new Path("/tmp/somedir2"), null); + FSDataOutputStream outputStream = lfs.create(p1, + EnumSet.of(CreateFlag.CREATE), Options + .CreateOpts.createParent()); + outputStream.write(new byte[]{1, 2, 3}); + outputStream.hflush(); + outputStream.close(); + ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 3); + tracker.handle(rle1); + ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); + tracker.handle(rle2); + localPath1 = rle1.getLocation(); + localPath2 = rle2.getLocation(); + //second resource is a directory with no contents + lfs.mkdir(localPath2, FsPermission.getDefault(), true); + dispatcher.await(); + //file size zero and should be 3B, must return false + lr1.setDownloadSize(4); + boolean isPresent = tracker.isResourcePresent(lr1); + Assert.assertFalse( + "expected size is 4,actual size is 3, must return false", isPresent); + //file size is 3B and should be 3B, must return true + lr1.setDownloadSize(3); + isPresent = tracker.isResourcePresent(lr1); + Assert.assertTrue( + "file size is 3B and should be 3B, must return true", isPresent); + + lfs.delete(localPath1, false); + //file does not exist, must return false + isPresent = tracker.isResourcePresent(lr1); + Assert.assertFalse("file does not exist, should return false", isPresent); + + //directory is present and size may not match, must return true + isPresent = tracker.isResourcePresent(lr2); + Assert.assertTrue(isPresent); + + lfs.delete(localPath2, false); + //directory does not exist and must return false + isPresent = tracker.isResourcePresent(lr2); + Assert.assertFalse(isPresent); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + if (localPath1 != null) { + lfs.delete(localPath1.getParent(), true); + } + if (localPath2 != null) { + lfs.delete(localPath2.getParent(), true); + } + if (req1 != null) { + lfs.delete(req1.getPath(), true); + } + if (req2 != null) { + lfs.delete(req2.getPath(), true); + } + } + } + @Test @SuppressWarnings("unchecked") public void testReleaseWhileDownloading() throws Exception {