diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index f0cd87b..22304fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -78,19 +78,14 @@ // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) - .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED, new FetchDirectTransition()) - .addTransition(ResourceState.INIT, ResourceState.INIT, - ResourceEventType.RELEASE, new ReleaseTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!! .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, ResourceEventType.LOCALIZED, new FetchSuccessTransition()) - .addTransition(ResourceState.DOWNLOADING, - EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), - ResourceEventType.RELEASE, new ReleasePendingTransition()) + .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING, + ResourceEventType.RELEASE, new ReleaseTransition()) .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) @@ -98,8 +93,6 @@ .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.REQUEST, new LocalizedResourceTransition()) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED) - .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.RELEASE, new ReleaseTransition()) .installTopology(); @@ -230,14 +223,6 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { } } - private static class FetchDirectTransition extends FetchSuccessTransition { - @Override - public void transition(LocalizedResource rsrc, ResourceEvent event) { - LOG.warn("Resource " + rsrc + " localized without listening container"); - super.transition(rsrc, event); - } - } - /** * Resource localized, notify waiting containers. */ @@ -304,17 +289,4 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { rsrc.release(relEvent.getContainer()); } } - - private static class ReleasePendingTransition implements - MultipleArcTransition { - @Override - public ResourceState transition(LocalizedResource rsrc, - ResourceEvent event) { - ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; - rsrc.release(relEvent.getContainer()); - return rsrc.ref.isEmpty() - ? ResourceState.INIT - : ResourceState.DOWNLOADING; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 7b9873a..2bb7404 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -31,7 +31,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; @@ -606,41 +605,20 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { final ExecutorService threadPool; final CompletionService queue; final Map,LocalizerResourceRequestEvent> pending; - // TODO hack to work around broken signaling - final Map> attempts; PublicLocalizer(Configuration conf) { this(conf, getLocalFileContext(conf), createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>(), - new HashMap>()); + new HashMap,LocalizerResourceRequestEvent>()); } PublicLocalizer(Configuration conf, FileContext lfs, ExecutorService threadPool, - Map,LocalizerResourceRequestEvent> pending, - Map> attempts) { + Map,LocalizerResourceRequestEvent> pending) { super("Public Localizer"); this.lfs = lfs; this.conf = conf; this.pending = pending; - this.attempts = attempts; -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// for (int i = 0, n = localDirs.size(); i < n; ++i) { -// publicFilecache[i] = -// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString(); -// } -// conf.setStrings(PUBCACHE_CTXT, publicFilecache); - -// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf); -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// int i = 0; -// for (String localDir : localDirs) { -// publicFilecache[i++] = -// new Path(localDir, ContainerLocalizer.FILECACHE).toString(); -// } this.threadPool = threadPool; this.queue = new ExecutorCompletionService(threadPool); @@ -648,36 +626,45 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { public void addResource(LocalizerResourceRequestEvent request) { // TODO handle failures, cancellation, requests by other containers - LocalResourceRequest key = request.getResource().getRequest(); + LocalizedResource rsrc = request.getResource(); + LocalResourceRequest key = rsrc.getRequest(); LOG.info("Downloading public rsrc:" + key); - synchronized (attempts) { - List sigh = attempts.get(key); - if (null == sigh) { + /* + * Here multiple containers may request the same resource. So we need + * to start downloading only when + * 1) ResourceState == DOWNLOADING + * 2) We are able to acquire non blocking semaphore lock. + * If not we will skip this resource as either it is getting downloaded + * or it FAILED / LOCALIZED. + */ + + if (rsrc.tryAcquire()) { + if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { LocalResource resource = request.getResource().getRequest(); try { - Path publicDirDestPath = dirsHandler.getLocalPathForWrite( - "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, - ContainerLocalizer.getEstimatedSize(resource), true); + Path publicDirDestPath = + dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + + ContainerLocalizer.FILECACHE, + ContainerLocalizer.getEstimatedSize(resource), true); Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); + publicRsrc.getPathForLocalization(key, publicDirDestPath); if (!hierarchicalPath.equals(publicDirDestPath)) { publicDirDestPath = hierarchicalPath; - DiskChecker.checkDir( - new File(publicDirDestPath.toUri().getPath())); + DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } publicDirDestPath = new Path(publicDirDestPath, Long.toString(publicRsrc .nextUniqueNumber())); - pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirDestPath, resource)), - request); - attempts.put(key, new LinkedList()); + pending.put(queue.submit(new FSDownload(lfs, null, conf, + publicDirDestPath, resource)), request); } catch (IOException e) { + rsrc.unlock(); + // TODO Need to Fix IO Exceptions - Notifying resource LOG.error("Local path for public localization is not found. " + " May be disks failed.", e); } } else { - sigh.add(request); + rsrc.unlock(); } } } @@ -700,24 +687,14 @@ public void run() { LocalResourceRequest key = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil .getDU(new File(local.toUri())))); - synchronized (attempts) { - attempts.remove(key); - } + assoc.getResource().unlock(); } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); LocalResourceRequest req = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e .getCause())); - synchronized (attempts) { - List reqs; - reqs = attempts.get(req); - if (null == reqs) { - LOG.error("Missing pending list for " + req); - return; - } - attempts.remove(req); - } + assoc.getResource().unlock(); } catch (CancellationException e) { // ignore; shutting down } @@ -780,18 +757,29 @@ private LocalResource findNextResource() { i.remove(); continue; } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ if (nRsrc.tryAcquire()) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource( - ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; + if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc + .getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + scheduled.put(nextRsrc, evt); + return next; + } else { + // Need to release acquired lock + nRsrc.unlock(); + } } } return null; @@ -863,6 +851,12 @@ LocalizerHeartbeatResponse update( new ResourceLocalizedEvent(req, ConverterUtils .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); } catch (URISyntaxException e) { } + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + if (pending.isEmpty()) { // TODO: Synchronization response.setLocalizerAction(LocalizerAction.DIE); @@ -889,11 +883,16 @@ LocalizerHeartbeatResponse update( break; case FETCH_FAILURE: LOG.info("DEBUG: FAILED " + req, stat.getException()); - assoc.getResource().unlock(); response.setLocalizerAction(LocalizerAction.DIE); getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle( new ResourceFailedLocalizationEvent(req, stat.getException())); + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + break; default: LOG.info("Unknown status: " + stat.getStatus()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index 07d8df1..730f74a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -176,24 +176,6 @@ public void testNotification() throws Exception { } } - @Test - public void testDirectLocalization() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(new Configuration()); - try { - dispatcher.start(); - LocalResource apiRsrc = createMockResource(); - LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc); - LocalizedResource local = new LocalizedResource(rsrcA, dispatcher); - Path p = new Path("file:///cache/rsrcA"); - local.handle(new ResourceLocalizedEvent(rsrcA, p, 10)); - dispatcher.await(); - assertEquals(ResourceState.LOCALIZED, local.getState()); - } finally { - dispatcher.stop(); - } - } - static LocalResource createMockResource() { // mock rsrc location org.apache.hadoop.yarn.api.records.URL uriA =