diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3845987..57da69a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -759,6 +759,11 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; + /** The number of threads to handle log aggregation in node manager. */ + public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE = + NM_PREFIX + "logaggregation.threadpool-size-max"; + public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100; + public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION = NM_PREFIX + "resourcemanager.minimum.version"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 6411535..47e4f7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -102,7 +102,8 @@ private final ConcurrentMap appLogAggregators; - private final ExecutorService threadPool; + @VisibleForTesting + public ExecutorService threadPool; public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { @@ -113,10 +114,6 @@ public LogAggregationService(Dispatcher dispatcher, Context context, this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.threadPool = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("LogAggregationService #%d") - .build()); } protected void serviceInit(Configuration conf) throws Exception { @@ -126,7 +123,13 @@ protected void serviceInit(Configuration conf) throws Exception { this.remoteRootLogDirSuffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); - + int threadPoolSize = + conf.getInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize, + new ThreadFactoryBuilder() + .setNameFormat("LogAggregationService #%d") + .build()); super.serviceInit(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 101fef0..5a857f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; @@ -1040,6 +1042,67 @@ private DrainDispatcher createDispatcher() { return appAcls; } + @Test + public void testFixedSizeThreadPool() throws Exception { + // store configured thread pool size temporarily for restoration + int threadPoolSize = conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, YarnConfiguration + .DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, 1); + + DeletionService delSrvc = mock(DeletionService.class); + + LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); + when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException()); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + ExecutorService executorService = logAggregationService.threadPool; + + final Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + finally { + semaphore.release(); + } + } + }; + executorService.submit(runnable); + executorService.submit(runnable); + + // count the current running LogAggregationService threads + int runningLogAggregationThread = 0; + for(Thread thread: Thread.getAllStackTraces().keySet()) { + if(thread.getName().startsWith("LogAggregationService")) { + runningLogAggregationThread++; + } + } + + assertEquals(1, runningLogAggregationThread); + + semaphore.release(); + + logAggregationService.stop(); + logAggregationService.close(); + + // restore the original configurations to avoid side effects + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, threadPoolSize); + } + + @Test(timeout=20000) public void testStopAfterError() throws Exception { DeletionService delSrvc = mock(DeletionService.class);