diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 7cf6b15..6a0efcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; @@ -65,6 +67,7 @@ private final Dispatcher dispatcher; private final ConcurrentMap localrsrc; private Configuration conf; + private LocalDirsHandlerService dirsHandler; /* * This flag controls whether this resource tracker uses hierarchical * directories or not. For PRIVATE and PUBLIC resource trackers it @@ -96,6 +99,16 @@ public LocalResourcesTrackerImpl(String user, ApplicationId appId, useLocalCacheDirectoryManager, conf, stateStore); } + public LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, + Configuration conf, NMStateStoreService stateStore, + LocalDirsHandlerService dirHandler) { + this(user, appId, dispatcher, + new ConcurrentHashMap(), + useLocalCacheDirectoryManager, conf, stateStore); + this.dirsHandler = dirHandler; + } + LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, ConcurrentMap localrsrc, @@ -115,6 +128,16 @@ public LocalResourcesTrackerImpl(String user, ApplicationId appId, this.stateStore = stateStore; } + LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, + ConcurrentMap localrsrc, + boolean useLocalCacheDirectoryManager, Configuration conf, + NMStateStoreService stateStore, LocalDirsHandlerService dirsHandler) { + this(user, appId, dispatcher, localrsrc, useLocalCacheDirectoryManager, + conf, stateStore); + this.dirsHandler = dirsHandler; + } + /* * Synchronizing this method for avoiding races due to multiple ResourceEvent's * coming to LocalResourcesTracker from Public/Private localizer and @@ -312,11 +335,53 @@ public boolean isResourcePresent(LocalizedResource rsrc) { toString()); if (!file.exists()) { ret = false; + } else if (dirsHandler != null) { + ret = checkLocalResource(rsrc); } } return ret; } - + + /** + * Check if the rsrc is Localized on a good dir + * @param rsrc + * @return + */ + @VisibleForTesting + boolean checkLocalResource(LocalizedResource rsrc) { + List localDirs = dirsHandler.getLocalDirs(); + // Full disks are also readable + localDirs.addAll(dirsHandler.getDiskFullLocalDirs()); + boolean ret = false; + for (String dir : localDirs) { + ret |= isParent(rsrc.getLocalPath().toUri().getPath(), dir); + } + return ret; + } + + /** + * @param path + * @param parentdir + * @return true if parentdir is parent of path else false. + */ + private boolean isParent(String path, String parentdir) { + String[] pathTokens = + path.split(File.separatorChar == '\\' ? "\\\\" : File.separator); + String[] parentTokens = + parentdir.split(File.separatorChar == '\\' ? "\\\\" : File.separator); + if (pathTokens.length < parentTokens.length) { + return false; + } + for (int index = 0; index < parentTokens.length; index++) { + if (parentTokens[index].equals(pathTokens[index])) { + continue; + } else { + return false; + } + } + return true; + } + @Override public boolean remove(LocalizedResource rem, DeletionService delService) { // current synchronization guaranteed by crude RLS event for cleanup diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 54c31c2..1e51c0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -228,7 +228,7 @@ private void validateConf(Configuration conf) { public void serviceInit(Configuration conf) throws Exception { this.validateConf(conf); this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher, - true, conf, stateStore); + true, conf, stateStore, dirsHandler); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 5695254..1596dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -18,22 +18,22 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import static org.mockito.Mockito.any; -import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.timeout; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; @@ -61,11 +62,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; public class TestLocalResourcesTrackerImpl { @@ -782,6 +786,70 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void testResourcePresentInGoodDir() throws IOException { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + LocalResourceRequest req1 = + createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC); + LocalResourceRequest req2 = + createLocalResourceRequest(user, 2, 1, LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + LocalizedResource lr2 = createLocalizedResource(req2, dispatcher); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + localrsrc.put(req2, lr2); + LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); + List goodDirs = new ArrayList(); + // /tmp/somedir2 is bad + goodDirs.add("/tmp/somedir1"); + goodDirs.add("/tmp/somedir2"); + Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs); + LocalResourcesTrackerImpl tracker = + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + true , conf, new NMNullStateStoreService(), dirsHandler); + ResourceEvent req11Event = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); + ResourceEvent req21Event = + new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1); + // Localize R1 for C1 + tracker.handle(req11Event); + // Localize R2 for C1 + tracker.handle(req21Event); + dispatcher.await(); + // Localize resource1 + Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1")); + Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2")); + ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1); + tracker.handle(rle1); + ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); + tracker.handle(rle2); + dispatcher.await(); + // Remove somedir2 from gooddirs + Assert.assertTrue(tracker.checkLocalResource(lr2)); + goodDirs.remove(1); + Assert.assertFalse(tracker.checkLocalResource(lr2)); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString());