diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 12a307930fc..5a5a8d782db 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -377,6 +377,7 @@ private LocalResource createApplicationResource(FileContext fs, Path p,
}
rsrc.setResource(URL.fromURI(uriWithFragment));
rsrc.setSize(rsrcStat.getLen());
+ rsrc.setDownloadSize(rsrcStat.getLen());
rsrc.setTimestamp(rsrcStat.getModificationTime());
rsrc.setType(type);
rsrc.setVisibility(viz);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
index e243e9051fb..f5bb75b24e3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
@@ -44,6 +44,8 @@
@Stable
public abstract class LocalResource {
+ long downloadSize;
+
@Public
@Stable
public static LocalResource newInstance(URL url, LocalResourceType type,
@@ -62,6 +64,7 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
resource.setType(type);
resource.setVisibility(visibility);
resource.setSize(size);
+ resource.setDownloadSize(size);
resource.setTimestamp(timestamp);
resource.setPattern(pattern);
resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
@@ -124,7 +127,12 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Public
@Stable
public abstract long getTimestamp();
-
+
+ @Public
+ @Unstable
+ public long getDownloadSize() {
+ return downloadSize;
+ }
/**
* Set the timestamp of the resource to be localized, used
* for verification.
@@ -211,4 +219,10 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Unstable
public abstract void setShouldBeUploadedToSharedCache(
boolean shouldBeUploadedToSharedCache);
+
+ @Public
+ @Unstable
+ public void setDownloadSize(long actualSize) {
+ this.downloadSize = actualSize;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index fdbe2d45bf5..e4dd62802b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -230,6 +230,7 @@ message LocalResourceProto {
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
optional bool should_be_uploaded_to_shared_cache = 7;
+ optional int64 downloadSize = 8 [default = -1];
}
message StringLongMapProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
index 560b081c016..b273055341f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
@@ -212,6 +212,18 @@ public synchronized void setShouldBeUploadedToSharedCache(
builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
}
+ @Override
+ public synchronized long getDownloadSize() {
+ LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getDownloadSize());
+ }
+
+ @Override
+ public synchronized void setDownloadSize(long size) {
+ maybeInitBuilder();
+ builder.setDownloadSize(size);
+ }
+
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 83f912f9138..109c6343f79 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -111,6 +111,7 @@ public static LocalResource newLocalResource(URL url, LocalResourceType type,
resource.setType(type);
resource.setVisibility(visibility);
resource.setSize(size);
+ resource.setDownloadSize(size);
resource.setTimestamp(timestamp);
resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
index d2e8e22d459..f624a8d2b4e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
@@ -35,6 +35,7 @@
private final LocalResourceType type;
private final LocalResourceVisibility visibility;
private final String pattern;
+ private long downloadSize;
/**
* Wrap API resource to match against cache of localized resources.
@@ -48,6 +49,7 @@ public LocalResourceRequest(LocalResource resource)
resource.getType(),
resource.getVisibility(),
resource.getPattern());
+ this.downloadSize = resource.getDownloadSize();
}
LocalResourceRequest(Path loc, long timestamp, LocalResourceType type,
@@ -142,6 +144,11 @@ public long getSize() {
}
@Override
+ public long getDownloadSize() {
+ return downloadSize;
+ }
+
+ @Override
public LocalResourceVisibility getVisibility() {
return visibility;
}
@@ -173,6 +180,11 @@ public void setSize(long size) {
}
@Override
+ public void setDownloadSize(long size) {
+ this.downloadSize = size;
+ }
+
+ @Override
public void setTimestamp(long timestamp) {
throw new UnsupportedOperationException();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index dd315431a13..515b0ff21cb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -19,6 +19,9 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -328,16 +331,32 @@ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
/**
* This module checks if the resource which was localized is already present
* or not
- *
* @param rsrc
* @return true/false based on resource is present or not
*/
- public boolean isResourcePresent(LocalizedResource rsrc) {
+ boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
+ BasicFileAttributes attributes = null;
if (rsrc.getState() == ResourceState.LOCALIZED) {
- File file = new File(rsrc.getLocalPath().toUri().getRawPath().
- toString());
- if (!file.exists()) {
+ File file = new File(rsrc.getLocalPath().toString());
+ try {
+ attributes =
+ Files.readAttributes(file.toPath(), BasicFileAttributes.class);
+ } catch (NoSuchFileException nsfe) {
+ LOG.info("Resource "+ rsrc.getLocalPath() + " was not found!");
+ return false;
+ } catch (IOException e) {
+ ret = false;
+ LOG.info("Encountered IOException while checking if the resource "
+ + rsrc.toString() + " is present. ", e);
+ return ret;
+ }
+ if (!attributes.isSymbolicLink() && !attributes.isDirectory()
+ && rsrc.getDownloadSize() != -1
+ && attributes.size() != rsrc.getDownloadSize()) {
+ LOG.info("Resource "+ rsrc.getLocalPath()
+ + " encountered a size mismatch, expected size="
+ + rsrc.getDownloadSize() + " actual size=" + attributes.size());
ret = false;
} else if (dirsHandler != null) {
ret = checkLocalResource(rsrc);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
index bd9602e4ddc..53cf8fc6e26 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
@@ -58,6 +58,7 @@
volatile Path localPath;
volatile long size = -1;
+ volatile long downloadSize = -1;
final LocalResourceRequest rsrc;
final Dispatcher dispatcher;
final StateMachine
@@ -173,6 +174,9 @@ public long getSize() {
return size;
}
+ public long getDownloadSize() {
+ return downloadSize;
+ }
public int getRefCount() {
return ref.size();
}
@@ -231,6 +235,7 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) {
LocalizerContext ctxt = req.getContext();
ContainerId container = ctxt.getContainerId();
rsrc.ref.add(container);
+ rsrc.downloadSize = rsrc.getRequest().getDownloadSize();
rsrc.dispatcher.getEventHandler().handle(
new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt,
req.getLocalResourceRequest().getPattern()));
@@ -248,6 +253,7 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) {
rsrc.localPath =
Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
+ rsrc.downloadSize = rsrc.getRequest().getDownloadSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(
@@ -310,6 +316,7 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
rsrc.localPath = recoveredEvent.getLocalPath();
rsrc.size = recoveredEvent.getSize();
+ rsrc.downloadSize = rsrc.getRequest().getDownloadSize();
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 17aa7d9f62a..c7a53634459 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -1048,6 +1048,7 @@ private LocalResource findNextResource() {
next.setType(nextRsrc.getType());
next.setVisibility(evt.getVisibility());
next.setPattern(evt.getPattern());
+ next.setDownloadSize(evt.getResource().getSize());
scheduled.put(nextRsrc, evt);
return next;
} else {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 6cab5934e89..a80d2d88829 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.permission.FsPermission;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.argThat;
import static org.mockito.Matchers.eq;
@@ -29,7 +33,9 @@
import java.io.File;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -902,6 +908,121 @@ public void testResourcePresentInGoodDir() throws IOException {
}
}
+ @Test (timeout = 120000)
+ public void testIsResourcePresentWithSizeMatch()
+ throws IOException, URISyntaxException {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ Path localPath1 = null;
+ Path localPath2 = null;
+ LocalResourceRequest req1 = null;
+ LocalResourceRequest req2 = null;
+ try {
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
+ EventHandler localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+ req1 =
+ createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ req2 =
+ createLocalResourceRequest(user, 2, 1,
+ LocalResourceVisibility.PUBLIC);
+ LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+ LocalizedResource lr2 = createLocalizedResource(req2, dispatcher);
+ ConcurrentMap localrsrc =
+ new ConcurrentHashMap();
+ localrsrc.put(req1, lr1);
+ localrsrc.put(req2, lr2);
+ LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class);
+ List goodDirs = new ArrayList();
+ goodDirs.add("/tmp/somedir1/");
+ goodDirs.add("/tmp/somedir2");
+ Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs);
+ Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs);
+ LocalResourcesTrackerImpl tracker =
+ new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+ true, conf, new NMNullStateStoreService(), dirsHandler);
+ ResourceEvent req11Event =
+ new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
+ ResourceEvent req21Event =
+ new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1);
+ // Localize R1 for C1
+ tracker.handle(req11Event);
+ // Localize R2 for C1
+ tracker.handle(req21Event);
+ dispatcher.await();
+ // Localize resource1
+ Path p1 = tracker.getPathForLocalization(req1,
+ new Path("/tmp/somedir1"), null);
+ Path p2 = tracker.getPathForLocalization(req2,
+ new Path("/tmp/somedir2"), null);
+ FSDataOutputStream outputStream = lfs.create(p1,
+ EnumSet.of(CreateFlag.CREATE), Options
+ .CreateOpts.createParent());
+ outputStream.write(new byte[]{1, 2, 3});
+ outputStream.hflush();
+ outputStream.close();
+ ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 3);
+ tracker.handle(rle1);
+ ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
+ tracker.handle(rle2);
+ localPath1 = rle1.getLocation();
+ localPath2 = rle2.getLocation();
+ //second resource is a directory with no contents
+ lfs.mkdir(localPath2, FsPermission.getDefault(), true);
+ dispatcher.await();
+ //file size zero and should be 3B, must return false
+ lr1.downloadSize = 4;
+ boolean isPresent = tracker.isResourcePresent(lr1);
+ Assert.assertFalse(
+ "expected file size is 4 and actual size is 3, must return false", isPresent);
+ //file size is 3B and should be 3B, must return true
+ lr1.downloadSize = 3;
+ isPresent = tracker.isResourcePresent(lr1);
+ Assert.assertTrue(
+ "file size is 3B and should be 3B, must return true", isPresent);
+
+ lfs.delete(localPath1, false);
+ //file does not exist, must return false
+ isPresent = tracker.isResourcePresent(lr1);
+ Assert.assertFalse("file does not exist, should return false", isPresent);
+
+ //directory is present and size may not match, must return true
+ isPresent = tracker.isResourcePresent(lr2);
+ Assert.assertTrue(isPresent);
+
+ lfs.delete(localPath2, false);
+ //directory does not exist and must return false
+ isPresent = tracker.isResourcePresent(lr2);
+ Assert.assertFalse(isPresent);
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ if (localPath1 != null) {
+ lfs.delete(localPath1.getParent(), true);
+ }
+ if (localPath2 != null) {
+ lfs.delete(localPath2.getParent(), true);
+ }
+ if (req1 != null) {
+ lfs.delete(req1.getPath(), true);
+ }
+ if (req2 != null) {
+ lfs.delete(req2.getPath(), true);
+ }
+ }
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testReleaseWhileDownloading() throws Exception {