diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 14ec911..7b6bc8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import java.util.List; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; @@ -38,4 +40,6 @@ String getUser(); LocalizedResource getLocalizedResource(LocalResourceRequest request); + + void checkLocalizedResources(List localDirs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 7cf6b15..dfc8342 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -19,7 +19,11 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -445,4 +449,50 @@ LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) { } return mgr; } + + /** + * Check localized resources and remove them if they are not present in good + * dirs. + * + * @return void + */ + public void checkLocalizedResources(List dirs) { + Collection rList = new HashSet<>(localrsrc.values()); + Iterator it = rList.iterator(); + while (it.hasNext()) { + LocalizedResource localizedResource = it.next(); + boolean isGoodResource = false; + for (String dir : dirs) { + isGoodResource |= + isParent(localizedResource.getLocalPath().toUri().getPath(), dir); + } + if (!isGoodResource) { + removeResource(localizedResource.getRequest()); + LOG.info("Removed " + localizedResource.getLocalPath() + + " from localized cache"); + } + } + } + + /** + * @param path + * @param parentdir + * @return true if parentdir is parent of path else false. + */ + private boolean isParent(String path, String parentdir) { + String pathTokens[] = path.split(File.separator); + String parentTokens[] = parentdir.split(File.separator); + if (pathTokens.length < parentTokens.length) { + return false; + } + boolean isParent = true; + for (int index = 0; index < parentTokens.length; index++) { + if (parentTokens[index].equals(pathTokens[index])) { + continue; + } else { + isParent = false; + } + } + return isParent; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 603e795..647285f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -261,6 +261,10 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void onDirsChanged() { checkAndInitializeLocalDirs(); + publicRsrc.checkLocalizedResources(dirsHandler.getLocalDirs()); + for (LocalResourcesTracker tracker : privateRsrc.values()) { + tracker.checkLocalizedResources(dirsHandler.getLocalDirs()); + } } }; logDirsChangeListener = new DirsChangeListener() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 5695254..b6d821a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -28,12 +28,13 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -61,8 +62,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -782,6 +785,80 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void testRemoveResouces() throws IOException { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + LocalResourceRequest req1 = + createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC); + LocalResourceRequest req2 = + createLocalResourceRequest(user, 2, 1, LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + LocalizedResource lr2 = createLocalizedResource(req2, dispatcher); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + localrsrc.put(req2, lr2); + NMStateStoreService nmState = new NMMemoryStateStoreService(); + nmState.init(conf); + LocalResourcesTracker tracker = + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + false, conf, nmState); + ResourceEvent req11Event = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); + ResourceEvent req21Event = + new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1); + // Localize R1 for C1 + tracker.handle(req11Event); + // Localize R2 for C1 + tracker.handle(req21Event); + dispatcher.await(); + // Localize resource1 + Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1")); + Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2")); + ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1); + tracker.handle(rle1); + ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); + tracker.handle(rle2); + dispatcher.await(); + Assert.assertEquals(2, localrsrc.size()); + List goodDirs = new ArrayList(); + // /tmp/somedir2 is bad + goodDirs.add("/tmp/somedir1"); + goodDirs.add("/tmp/somedir3"); + tracker.checkLocalizedResources(goodDirs); + // check local cache + Assert.assertEquals(1, localrsrc.size()); + Assert.assertEquals(p1.toUri().getPath(), localrsrc.get(req1) + .getLocalPath().toUri().getPath()); + RecoveredLocalizationState localState = nmState.loadLocalizationState(); + List protoList = + localState.getUserResources().get("testuser") + .getPrivateTrackerState().getLocalizedResources(); + // check state store + Assert.assertEquals(1, protoList.size()); + Assert + .assertEquals(p1.toUri().getPath(), protoList.get(0).getLocalPath()); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString());