diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index f0cd87b..b1c1063 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import java.util.EnumSet; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore; @@ -42,7 +41,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; -import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -78,19 +76,14 @@ // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) - .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED, new FetchDirectTransition()) - .addTransition(ResourceState.INIT, ResourceState.INIT, - ResourceEventType.RELEASE, new ReleaseTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, - ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!! + ResourceEventType.REQUEST, new FetchDuplicateDownloadTransition()) .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, ResourceEventType.LOCALIZED, new FetchSuccessTransition()) - .addTransition(ResourceState.DOWNLOADING, - EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), - ResourceEventType.RELEASE, new ReleasePendingTransition()) + .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, + ResourceEventType.RELEASE, new ReleaseTransition()) .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) @@ -98,8 +91,6 @@ .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.REQUEST, new LocalizedResourceTransition()) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED) - .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.RELEASE, new ReleaseTransition()) .installTopology(); @@ -211,7 +202,7 @@ public void handle(ResourceEvent event) { // typedef } - /** + /* * Transition from INIT to DOWNLOADING. * Sends a {@link LocalizerResourceRequestEvent} to the * {@link ResourceLocalizationService}. @@ -230,11 +221,16 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { } } - private static class FetchDirectTransition extends FetchSuccessTransition { + /** + * Transition from DOWNLOADING to DOWNLOADING. Add container to resource + * waiting queue (ref) + */ + private static class FetchDuplicateDownloadTransition extends + ResourceTransition { @Override - public void transition(LocalizedResource rsrc, ResourceEvent event) { - LOG.warn("Resource " + rsrc + " localized without listening container"); - super.transition(rsrc, event); + public void transition(LocalizedResource resource, ResourceEvent event) { + ResourceRequestEvent req = (ResourceRequestEvent) event; + resource.ref.add(req.getContext().getContainerId()); } } @@ -304,17 +300,4 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { rsrc.release(relEvent.getContainer()); } } - - private static class ReleasePendingTransition implements - MultipleArcTransition { - @Override - public ResourceState transition(LocalizedResource rsrc, - ResourceEvent event) { - ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; - rsrc.release(relEvent.getContainer()); - return rsrc.ref.isEmpty() - ? ResourceState.INIT - : ResourceState.DOWNLOADING; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 7b9873a..eac6322 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -381,7 +381,6 @@ private void handleCacheCleanup(LocalizationEvent event) { //TODO Check if appRsrcs should also be added to the retention set. } - @SuppressWarnings("unchecked") private void handleCleanupContainerResources( ContainerLocalizationCleanupEvent rsrcCleanup) { @@ -606,42 +605,20 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { final ExecutorService threadPool; final CompletionService queue; final Map,LocalizerResourceRequestEvent> pending; - // TODO hack to work around broken signaling - final Map> attempts; PublicLocalizer(Configuration conf) { this(conf, getLocalFileContext(conf), createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>(), - new HashMap>()); + new HashMap,LocalizerResourceRequestEvent>()); } PublicLocalizer(Configuration conf, FileContext lfs, ExecutorService threadPool, - Map,LocalizerResourceRequestEvent> pending, - Map> attempts) { + Map,LocalizerResourceRequestEvent> pending) { super("Public Localizer"); this.lfs = lfs; this.conf = conf; this.pending = pending; - this.attempts = attempts; -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// for (int i = 0, n = localDirs.size(); i < n; ++i) { -// publicFilecache[i] = -// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString(); -// } -// conf.setStrings(PUBCACHE_CTXT, publicFilecache); - -// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf); -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// int i = 0; -// for (String localDir : localDirs) { -// publicFilecache[i++] = -// new Path(localDir, ContainerLocalizer.FILECACHE).toString(); -// } - this.threadPool = threadPool; this.queue = new ExecutorCompletionService(threadPool); } @@ -650,35 +627,26 @@ public void addResource(LocalizerResourceRequestEvent request) { // TODO handle failures, cancellation, requests by other containers LocalResourceRequest key = request.getResource().getRequest(); LOG.info("Downloading public rsrc:" + key); - synchronized (attempts) { - List sigh = attempts.get(key); - if (null == sigh) { - LocalResource resource = request.getResource().getRequest(); - try { - Path publicDirDestPath = dirsHandler.getLocalPathForWrite( - "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, - ContainerLocalizer.getEstimatedSize(resource), true); - Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); - if (!hierarchicalPath.equals(publicDirDestPath)) { - publicDirDestPath = hierarchicalPath; - DiskChecker.checkDir( - new File(publicDirDestPath.toUri().getPath())); - } - publicDirDestPath = - new Path(publicDirDestPath, Long.toString(publicRsrc - .nextUniqueNumber())); - pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirDestPath, resource)), - request); - attempts.put(key, new LinkedList()); - } catch (IOException e) { - LOG.error("Local path for public localization is not found. " - + " May be disks failed.", e); - } - } else { - sigh.add(request); + LocalResource resource = request.getResource().getRequest(); + try { + Path publicDirDestPath = + dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + + ContainerLocalizer.FILECACHE, + ContainerLocalizer.getEstimatedSize(resource), true); + Path hierarchicalPath = + publicRsrc.getPathForLocalization(key, publicDirDestPath); + if (!hierarchicalPath.equals(publicDirDestPath)) { + publicDirDestPath = hierarchicalPath; + DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } + publicDirDestPath = + new Path(publicDirDestPath, Long.toString(publicRsrc + .nextUniqueNumber())); + pending.put(queue.submit(new FSDownload(lfs, null, conf, + publicDirDestPath, resource)), request); + } catch (IOException e) { + LOG.error("Local path for public localization is not found. " + + " May be disks failed.", e); } } @@ -700,24 +668,12 @@ public void run() { LocalResourceRequest key = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil .getDU(new File(local.toUri())))); - synchronized (attempts) { - attempts.remove(key); - } } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); LocalResourceRequest req = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e .getCause())); - synchronized (attempts) { - List reqs; - reqs = attempts.get(req); - if (null == reqs) { - LOG.error("Missing pending list for " + req); - return; - } - attempts.remove(req); - } } catch (CancellationException e) { // ignore; shutting down } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index b2caba0..edc5a01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -116,7 +116,11 @@ public void test() { tracker.handle(req21Event); dispatcher.await(); - verify(localizerEventHandler, times(3)).handle( + // Now only 2 unique resources are requested. + // Resource1 by container-1 and container-2 + // Resource2 by container-1 + // So there will be only 2 LocalizerResourceRequestEvent + verify(localizerEventHandler, times(2)).handle( any(LocalizerResourceRequestEvent.class)); // Verify refCount for R1 is 2 Assert.assertEquals(2, lr1.getRefCount()); @@ -129,15 +133,21 @@ public void test() { dispatcher.await(); verifyTrackedResourceCount(tracker, 2); - // Verify resources in state INIT with ref-count=0 is removed. - Assert.assertTrue(tracker.remove(lr2, mockDelService)); - verifyTrackedResourceCount(tracker, 1); + // Even if requesting container releases on the requested resource + // the resource will still remain in cache in DOWNLOADING state with + // ref count of 0. This is required because downloading of that resource + // has already been requested. So can't move the resource back to INIT + // state. + Assert.assertEquals(ResourceState.DOWNLOADING, localrsrc.get(req2) + .getState()); + Assert.assertEquals(0, localrsrc.get(req2).getRefCount()); - // Verify resource with non zero ref count is not removed. - Assert.assertEquals(2, lr1.getRefCount()); + // Trying to remove both the resources in DOWNLADING state one with zero + // ref count and another with > 0 ref count. Both should return false. Assert.assertFalse(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 1); - + Assert.assertFalse(tracker.remove(lr2, mockDelService)); + verifyTrackedResourceCount(tracker, 2); + // Localize resource1 ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path("file:///tmp/r1"), 1); @@ -151,7 +161,13 @@ public void test() { // Verify resources in state LOCALIZED with ref-count=0 is removed. Assert.assertTrue(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 0); + // Now only one resource should be present in DOWNLOADING state with + // ref count of 0. + verifyTrackedResourceCount(tracker, 1); + // resource-2 is still present in cache + Assert.assertTrue(localrsrc.containsKey(req2)); + // resource-1 is removed from cache + Assert.assertFalse(localrsrc.containsKey(req1)); } finally { if (dispatcher != null) { dispatcher.stop(); @@ -277,7 +293,9 @@ public void testLocalResourceCache() { Assert.assertTrue(localrsrc.get(lr).ref.contains(cId1)); Assert.assertEquals(ResourceState.DOWNLOADING, localrsrc.get(lr) .getState()); - + verify(localizerEventHandler, times(1)).handle( + isA(LocalizerResourceRequestEvent.class)); + // Container 2 requesting the resource ContainerId cId2 = BuilderUtils.newContainerId(1, 1, 1, 2); LocalizerContext lc2 = new LocalizerContext(user, cId2, null); @@ -285,6 +303,11 @@ public void testLocalResourceCache() { new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2); tracker.handle(reqEvent2); + // As resource localization has already been requested so this should not + // request it again. No addition in method calls. (only once-times(1)) + verify(localizerEventHandler, times(1)).handle( + isA(LocalizerResourceRequestEvent.class)); + // Container 2 should have been added to the waiting queue of the local // resource Assert.assertEquals(2, localrsrc.get(lr).getRefCount());