diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4236392..58c74c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -746,6 +746,7 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { final CompletionService queue; // Its shared between public localizer and dispatcher thread. final Map,LocalizerResourceRequestEvent> pending; + private String prevLocId; PublicLocalizer(Configuration conf) { super("Public Localizer"); @@ -762,6 +763,20 @@ public void addResource(LocalizerResourceRequestEvent request) { LocalizedResource rsrc = request.getResource(); LocalResourceRequest key = rsrc.getRequest(); LOG.info("Downloading public rsrc:" + key); + + String locId = request.getLocalizerId(); + // Don't need synchronize on prevLocId, because + // addResource is always running in Dispatcher thread. + // Initialized local/log dirs per container instead of + // per resource. + if (!locId.equals(prevLocId)) { + prevLocId = locId; + // In case this is not a newly initialized nm state, ensure + // initialized local/log dirs similar to LocalizerRunner + getInitializedLocalDirs(); + getInitializedLogDirs(); + } + /* * Here multiple containers may request the same resource. So we need * to start downloading only when @@ -785,11 +800,6 @@ public void addResource(LocalizerResourceRequestEvent request) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } - // In case this is not a newly initialized nm state, ensure - // initialized local/log dirs similar to LocalizerRunner - getInitializedLocalDirs(); - getInitializedLogDirs(); - // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) {