diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 58b6ef9..7a57a92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -840,6 +840,11 @@ public class YarnConfiguration extends Configuration { public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; + /** The number of threads to handle log aggregation in node manager. */ + public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE = + NM_PREFIX + "logaggregation.threadpool-size-max"; + public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100; + public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION = NM_PREFIX + "resourcemanager.minimum.version"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 9924db6..c645a19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -97,7 +97,7 @@ public class LogAggregationService extends AbstractService implements private final ConcurrentMap appLogAggregators; - private final ExecutorService threadPool; + ExecutorService threadPool; public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { @@ -108,13 +108,14 @@ public class LogAggregationService extends AbstractService implements this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.threadPool = Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("LogAggregationService #%d") - .build()); } protected void serviceInit(Configuration conf) throws Exception { + int threadPoolSize = getAggregatorThreadPoolSize(conf); + this.threadPool = Executors.newFixedThreadPool(threadPoolSize, + new ThreadFactoryBuilder() + .setNameFormat("LogAggregationService #%d") + .build()); this.remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); @@ -472,4 +473,25 @@ public class LogAggregationService extends AbstractService implements public NodeId getNodeId() { return this.nodeId; } + + private int getAggregatorThreadPoolSize(Configuration conf) { + int threadPoolSize; + try { + threadPoolSize = conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + } catch (NumberFormatException ex) { + LOG.warn("Invalid thread pool size. Setting it to the default value " + + "in YarnConfiguration"); + threadPoolSize = YarnConfiguration. + DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + } + if(threadPoolSize <= 0) { + LOG.warn("Invalid thread pool size. Setting it to the default value " + + "in YarnConfiguration"); + threadPoolSize = YarnConfiguration. + DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + } + return threadPoolSize; + } }