diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3845987..57da69a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -759,6 +759,11 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
+ /** The number of threads to handle log aggregation in node manager. */
+ public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE =
+ NM_PREFIX + "logaggregation.threadpool-size-max";
+ public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100;
+
public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
NM_PREFIX + "resourcemanager.minimum.version";
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1d410f1..f5e3dcf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1227,6 +1227,14 @@
+
+ Thread pool size for LogAggregationService in Node Manager.
+
+ yarn.nodemanager.logaggregation.threadpool-size-max
+ 100
+
+
+
Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of
CPU that YARN containers use. Currently functional only
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 6411535..c49b462 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -102,7 +102,8 @@
private final ConcurrentMap appLogAggregators;
- private final ExecutorService threadPool;
+ @VisibleForTesting
+ ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@@ -113,10 +114,6 @@ public LogAggregationService(Dispatcher dispatcher, Context context,
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap();
- this.threadPool = HadoopExecutors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setNameFormat("LogAggregationService #%d")
- .build());
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -126,7 +123,13 @@ protected void serviceInit(Configuration conf) throws Exception {
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-
+ int threadPoolSize =
+ conf.getInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+ this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
+ new ThreadFactoryBuilder()
+ .setNameFormat("LogAggregationService #%d")
+ .build());
super.serviceInit(conf);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 101fef0..c3673f2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -55,6 +55,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
@@ -1040,6 +1042,71 @@ private DrainDispatcher createDispatcher() {
return appAcls;
}
+ @Test
+ public void testFixedSizeThreadPool() throws Exception {
+ // store configured thread pool size temporarily for restoration
+ int threadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, YarnConfiguration
+ .DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, 1);
+
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
+ when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc);
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ExecutorService executorService = logAggregationService.threadPool;
+
+ // used to block threads in the thread pool because main thread always
+ // acquires it first. Because the thread pool size is set as 1, there
+ // is always only one thread blocking in the thread with the other runnable
+ // waiting in the queue.
+ final Semaphore semaphore = new Semaphore(1);
+ semaphore.acquire();
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ finally {
+ semaphore.release();
+ }
+ }
+ };
+ executorService.submit(runnable);
+ executorService.submit(runnable);
+
+ // count the current running LogAggregationService threads
+ int runningLogAggregationThread = 0;
+ for(Thread thread: Thread.getAllStackTraces().keySet()) {
+ if(thread.getName().startsWith("LogAggregationService")) {
+ runningLogAggregationThread++;
+ }
+ }
+
+ assertEquals(1, runningLogAggregationThread);
+
+ semaphore.release();
+
+ logAggregationService.stop();
+ logAggregationService.close();
+
+ // restore the original configurations to avoid side effects
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, threadPoolSize);
+ }
+
+
@Test(timeout=20000)
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);