diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 1fdb082..5ce8984 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -635,7 +635,7 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { PublicLocalizer(Configuration conf) { this(conf, getLocalFileContext(conf), createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>()); + new ConcurrentHashMap,LocalizerResourceRequestEvent>()); } PublicLocalizer(Configuration conf, FileContext lfs, @@ -764,8 +764,9 @@ public void run() { } public void addResource(LocalizerResourceRequestEvent request) { - // TDOO: Synchronization - pending.add(request); + synchronized (pending) { + pending.add(request); + } } /** @@ -774,43 +775,44 @@ public void addResource(LocalizerResourceRequestEvent request) { * @return */ private LocalResource findNextResource() { - // TODO: Synchronization - for (Iterator i = pending.iterator(); - i.hasNext();) { - LocalizerResourceRequestEvent evt = i.next(); - LocalizedResource nRsrc = evt.getResource(); - // Resource download should take place ONLY if resource is in - // Downloading state - if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { - i.remove(); - continue; - } - /* - * Multiple containers will try to download the same resource. So the - * resource download should start only if - * 1) We can acquire a non blocking semaphore lock on resource - * 2) Resource is still in DOWNLOADING state - */ - if (nRsrc.tryAcquire()) { - if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc - .getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; - } else { - // Need to release acquired lock - nRsrc.unlock(); - } - } + synchronized (pending) { + for (Iterator i = pending.iterator(); + i.hasNext();) { + LocalizerResourceRequestEvent evt = i.next(); + LocalizedResource nRsrc = evt.getResource(); + // Resource download should take place ONLY if resource is in + // Downloading state + if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { + i.remove(); + continue; + } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ + if (nRsrc.tryAcquire()) { + if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc + .getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + scheduled.put(nextRsrc, evt); + return next; + } else { + // Need to release acquired lock + nRsrc.unlock(); + } + } + } + return null; } - return null; } LocalizerHeartbeatResponse update(