diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index a951c0bb0fc..1a5e1f899c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -146,6 +146,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ResourceLocalizationService extends CompositeService @@ -721,22 +722,18 @@ public LocalizerRunner getLocalizerRunner(String locId) { */ class LocalizerTracker extends AbstractService implements EventHandler { + private static final int MAX_CAPACITY = 128; + private final PublicLocalizer publicLocalizer; - private final Map privLocalizers; + private final ConcurrentMap privLocalizers; private final Map recentlyCleanedLocalizers; - private final int maxRecentlyCleaned = 128; LocalizerTracker(Configuration conf) { - this(conf, new HashMap()); - } - - LocalizerTracker(Configuration conf, - Map privLocalizers) { super(LocalizerTracker.class.getName()); this.publicLocalizer = new PublicLocalizer(conf); - this.privLocalizers = privLocalizers; - this.recentlyCleanedLocalizers = - new LRUCacheHashMap(maxRecentlyCleaned, false); + this.privLocalizers = Maps.newConcurrentMap(); + this.recentlyCleanedLocalizers = Collections + .synchronizedMap(new LRUCacheHashMap<>(MAX_CAPACITY, false)); } @Override @@ -747,19 +744,17 @@ public synchronized void serviceStart() throws Exception { public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { String locId = status.getLocalizerId(); - synchronized (privLocalizers) { - LocalizerRunner localizer = privLocalizers.get(locId); - if (null == localizer) { - // TODO process resources anyway - LOG.info("Unknown localizer with localizerId " + locId - + " is sending heartbeat. Ordering it to DIE"); - LocalizerHeartbeatResponse response = + LocalizerRunner localizer = privLocalizers.get(locId); + if (null == localizer) { + // TODO process resources anyway + LOG.info("Unknown localizer with localizerId " + locId + + " is sending heartbeat. Ordering it to DIE"); + LocalizerHeartbeatResponse response = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); - response.setLocalizerAction(LocalizerAction.DIE); - return response; - } - return localizer.processHeartbeat(status.getResources()); + response.setLocalizerAction(LocalizerAction.DIE); + return response; } + return localizer.processHeartbeat(status.getResources()); } @Override @@ -785,35 +780,41 @@ public void handle(LocalizerEvent event) { break; case PRIVATE: case APPLICATION: - synchronized (privLocalizers) { - LocalizerRunner localizer = privLocalizers.get(locId); - if (localizer != null && localizer.killContainerLocalizer.get()) { - // Old localizer thread has been stopped, remove it and create - // a new localizer thread. - LOG.info("New " + event.getType() + " localize request for " - + locId + ", remove old private localizer."); - privLocalizers.remove(locId); - localizer.interrupt(); - localizer = null; - } - if (null == localizer) { - // Don't create a new localizer if this one has been recently - // cleaned up - this can happen if localization requests come - // in after cleanupPrivLocalizers has been called. - if (recentlyCleanedLocalizers.containsKey(locId)) { - LOG.info( - "Skipping localization request for recently cleaned " + - "localizer " + locId + " resource:" + req.getResource()); - break; - } - LOG.info("Created localizer for " + locId); - localizer = new LocalizerRunner(req.getContext(), locId); - privLocalizers.put(locId, localizer); - localizer.start(); - } - // 1) propagate event + LocalizerRunner localizer = + privLocalizers.compute(locId, (key, value) -> { + if (value != null) { + if (!value.killContainerLocalizer.get()) { + return value; + } + // Old localizer thread has been stopped + // Interrupt it and replace with a new localizer thread + LOG.info("New {} localize request for {} remove old " + + "private localizer.", event.getType(), key); + value.interrupt(); + } + // Do not create a new localizer if this one has been recently + // cleaned up - this can happen if localization requests come + // in after cleanupPrivLocalizers has been called. + if (recentlyCleanedLocalizers.containsKey(key)) { + LOG.info( + "Skipping localization request for recently cleaned " + + "localizer {} resource: {}", + key, req.getResource()); + return null; + } + + LOG.info("Creating localizer for {}", key); + LocalizerRunner localizeRunner = + new LocalizerRunner(req.getContext(), key); + localizeRunner.start(); + return localizeRunner; + }); + + // propagate event + if (localizer != null) { localizer.addResource(req); } + break; } break; @@ -821,26 +822,18 @@ public void handle(LocalizerEvent event) { } public void cleanupPrivLocalizers(String locId) { - synchronized (privLocalizers) { - LocalizerRunner localizer = privLocalizers.get(locId); + privLocalizers.computeIfPresent(locId, (key, localizer) -> { recentlyCleanedLocalizers.put(locId, locId); - if (null == localizer) { - return; // ignore; already gone - } - privLocalizers.remove(locId); localizer.interrupt(); - } + return null; + }); } public void endContainerLocalization(String locId) { - LocalizerRunner localizer; - synchronized (privLocalizers) { - localizer = privLocalizers.get(locId); - if (null == localizer) { - return; // ignore - } + LocalizerRunner localizer = privLocalizers.get(locId); + if (null != localizer) { + localizer.endContainerLocalization(); } - localizer.endContainerLocalization(); } }