diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index bb7db12..2fe134f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -45,6 +45,8 @@ private TimelineWriter writer; + volatile boolean initialized = false; + public TimelineCollector(String name) { super(name); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 953d9b7..7bb305a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -32,9 +32,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Class that manages adding and removing collectors and their lifecycle. It @@ -61,9 +60,8 @@ public void serviceInit(Configuration conf) throws Exception { // access to this map is synchronized with the map itself - private final Map collectors = - Collections.synchronizedMap( - new HashMap()); + private final ConcurrentMap collectors = + new ConcurrentHashMap<>(); protected TimelineCollectorManager(String name) { super(name); @@ -79,26 +77,36 @@ protected TimelineCollectorManager(String name) { */ public TimelineCollector putIfAbsent(ApplicationId appId, TimelineCollector collector) { - TimelineCollector collectorInTable = null; - synchronized (collectors) { - collectorInTable = collectors.get(appId); - if (collectorInTable == null) { - try { - // initialize, start, and add it to the collection so it can be - // cleaned up when the parent shuts down - collector.init(getConfig()); - collector.setWriter(writer); - collector.start(); - collectors.put(appId, collector); - LOG.info("the collector for " + appId + " was added"); + TimelineCollector collectorInTable; + collectorInTable = collectors.get(appId); + if (collectorInTable == null) { + try { + // initialize and start a collector speculatively. + collector.init(getConfig()); + collector.setWriter(writer); + collector.start(); + // try add the new collector to the concurrent hash map atomically. + TimelineCollector prevCollectorInTable = + collectors.putIfAbsent(appId, collector); + // if a previous (as in synchronization order) collector exists, we + // should shut down the newly created collector since it will not be + // published. + if (prevCollectorInTable != null) { + collector.stop(); + collectorInTable = prevCollectorInTable; + initializationBarrier(collectorInTable); + } else { + // successful put collectorInTable = collector; + LOG.info("the collector for " + appId + " was added"); postPut(appId, collectorInTable); - } catch (Exception e) { - throw new YarnRuntimeException(e); + collectorInTable.initialized = true; } - } else { - LOG.info("the collector for " + appId + " already exists!"); + } catch (Exception e) { + throw new YarnRuntimeException(e); } + } else { + LOG.info("the collector for " + appId + " already exists!"); } return collectorInTable; } @@ -136,7 +144,9 @@ protected void postRemove(ApplicationId appId, TimelineCollector collector) { * @return the collector or null if it does not exist */ public TimelineCollector get(ApplicationId appId) { - return collectors.get(appId); + TimelineCollector collector = collectors.get(appId); + initializationBarrier(collector); + return collector; } /** @@ -147,4 +157,8 @@ public boolean containsTimelineCollector(ApplicationId appId) { return collectors.containsKey(appId); } + private void initializationBarrier(TimelineCollector collector) { + while (!collector.initialized); + } + }