diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3845987..57da69a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -759,6 +759,11 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; + /** The number of threads to handle log aggregation in node manager. */ + public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE = + NM_PREFIX + "logaggregation.threadpool-size-max"; + public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100; + public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION = NM_PREFIX + "resourcemanager.minimum.version"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1d410f1..f5e3dcf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1227,6 +1227,14 @@ + + Thread pool size for LogAggregationService in Node Manager. + + yarn.nodemanager.logaggregation.threadpool-size-max + 100 + + + Percentage of CPU that can be allocated for containers. This setting allows users to limit the amount of CPU that YARN containers use. Currently functional only diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 6411535..db08109 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -102,7 +102,8 @@ private final ConcurrentMap appLogAggregators; - private final ExecutorService threadPool; + @VisibleForTesting + ExecutorService threadPool; public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { @@ -113,10 +114,6 @@ public LogAggregationService(Dispatcher dispatcher, Context context, this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.threadPool = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("LogAggregationService #%d") - .build()); } protected void serviceInit(Configuration conf) throws Exception { @@ -126,7 +123,11 @@ protected void serviceInit(Configuration conf) throws Exception { this.remoteRootLogDirSuffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); - + int threadPoolSize = getAggregatorThreadPoolSize(conf); + this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize, + new ThreadFactoryBuilder() + .setNameFormat("LogAggregationService #%d") + .build()); super.serviceInit(conf); } @@ -487,4 +488,24 @@ public void handle(LogHandlerEvent event) { public NodeId getNodeId() { return this.nodeId; } + + + private int getAggregatorThreadPoolSize(Configuration conf) { + int threadPoolSize = YarnConfiguration + .DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + try { + threadPoolSize = conf.getInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + } catch (NumberFormatException ex) { + LOG.warn("Invalid thread pool size. Setting it to the default value " + + "in YarnConfiguration"); + threadPoolSize = YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + } + if(threadPoolSize <= 0) { + LOG.warn("Invalid thread pool size. Setting it to the default value " + + "in YarnConfiguration"); + threadPoolSize = YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + } + return threadPoolSize; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 101fef0..2ac1853 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -55,6 +55,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; @@ -1040,6 +1044,70 @@ private DrainDispatcher createDispatcher() { return appAcls; } + @Test (timeout = 30000) + public void testFixedSizeThreadPool() throws Exception { + // store configured thread pool size temporarily for restoration + int initThreadPoolSize = conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, YarnConfiguration + .DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + + int threadPoolSize = 3; + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, threadPoolSize); + + DeletionService delSrvc = mock(DeletionService.class); + + LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); + when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException()); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + ExecutorService executorService = logAggregationService.threadPool; + + // used to block threads in the thread pool because main thread always + // acquires it first. + final Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + // the thread in thread pool running this will be blocked and occupied + semaphore.tryAcquire(35000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + semaphore.release(); + } + } + }; + + // submit $(threadPoolSize + 1) runnables to the thread pool. If the thread + // pool size is set properly, only $(threadPoolSize) threads will be + // created in the thread pool, each of which is blocked on the semaphore. + for(int i = 0; i < threadPoolSize + 1; i++) { + executorService.submit(runnable); + } + + + // count the current running LogAggregationService threads + int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount(); + assertEquals(threadPoolSize, runningThread); + + semaphore.release(); + + logAggregationService.stop(); + logAggregationService.close(); + + // restore the original configurations to avoid side effects + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, initThreadPoolSize); + } + + @Test(timeout=20000) public void testStopAfterError() throws Exception { DeletionService delSrvc = mock(DeletionService.class);