diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java index dc8d19b..d076599 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -40,7 +41,6 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -137,7 +137,7 @@ public Object invoke(Object proxy, final Method method, try { Map, ProxyInfo> proxyMap = new HashMap<>(); int numAttempts = 0; - executor = Executors.newFixedThreadPool(allProxies.size()); + executor = HadoopExecutors.newFixedThreadPool(allProxies.size()); completionService = new ExecutorCompletionService<>(executor); for (final ProxyInfo pInfo : allProxies.values()) { Callable c = new Callable() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 3597b31..a2efb6b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +52,7 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.Assert; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -273,7 +273,7 @@ public void testDownloadBadPublic() throws IOException, URISyntaxException, Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); int size = 512; @@ -362,7 +362,7 @@ public Boolean call() throws IOException { }); } - ExecutorService exec = Executors.newFixedThreadPool(fileCount); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount); try { List> futures = exec.invokeAll(tasks); // files should be public @@ -399,7 +399,7 @@ public void testDownload() throws IOException, URISyntaxException, Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); int[] sizes = new int[10]; @@ -468,7 +468,7 @@ private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, System.out.println("SEED: " + sharedSeed); Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator( TestFSDownload.class.getName()); @@ -619,7 +619,7 @@ public void testDirDownload() throws IOException, InterruptedException { Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); for (int i = 0; i < 5; ++i) { @@ -674,7 +674,8 @@ public void testUniqueDestinationPath() throws Exception { files.mkdir(basedir, null, true); conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); - ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor(); + ExecutorService singleThreadedExec = HadoopExecutors + .newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java index 513d7ac..7a20c24 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java @@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -53,7 +54,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @@ -109,7 +109,7 @@ public RegistryAdminService(String name) { public RegistryAdminService(String name, RegistryBindingSource bindingSource) { super(name, bindingSource); - executor = Executors.newCachedThreadPool( + executor = HadoopExecutors.newCachedThreadPool( new ThreadFactory() { private AtomicInteger counter = new AtomicInteger(1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 2e0cbbf..ff6349b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; @@ -116,12 +117,12 @@ protected void serviceInit(Configuration conf) throws Exception { .setNameFormat("DeletionService #%d") .build(); if (conf != null) { - sched = new DelServiceSchedThreadPoolExecutor( + sched = new HadoopScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { - sched = new DelServiceSchedThreadPoolExecutor( + sched = new HadoopScheduledThreadPoolExecutor( YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -158,34 +159,6 @@ public boolean isTerminated() { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } - private static class DelServiceSchedThreadPoolExecutor extends - ScheduledThreadPoolExecutor { - public DelServiceSchedThreadPoolExecutor(int corePoolSize, - ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); - } - - @Override - protected void afterExecute(Runnable task, Throwable exception) { - if (task instanceof FutureTask) { - FutureTask futureTask = (FutureTask) task; - if (!futureTask.isCancelled()) { - try { - futureTask.get(); - } catch (ExecutionException ee) { - exception = ee.getCause(); - } catch (InterruptedException ie) { - exception = ie; - } - } - } - if (exception != null) { - LOG.error("Exception during execution of task in DeletionService", - exception); - } - } - } - public static class FileDeletionTask implements Runnable { public static final int INVALID_TASK_ID = -1; private int taskId; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 3a2649e..cbbd828 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +30,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -65,7 +65,7 @@ private LocalDirsHandlerService dirsHandler; @VisibleForTesting public ExecutorService containerLauncher = - Executors.newCachedThreadPool( + HadoopExecutors.newCachedThreadPool( new ThreadFactoryBuilder() .setNameFormat("ContainersLauncher #%d") .build()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index f82f894..927699e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -52,6 +51,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -187,7 +187,7 @@ public LocalizationProtocol run() { } ExecutorService createDownloadThreadPool() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("ContainerLocalizer Downloader").build()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index c0c2e8e..b2413ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -43,11 +43,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -75,6 +73,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -194,7 +194,7 @@ public ResourceLocalizationService(Dispatcher dispatcher, this.delService = delService; this.dirsHandler = dirsHandler; - this.cacheCleanup = new ScheduledThreadPoolExecutor(1, + this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") .build()); @@ -784,7 +784,7 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("PublicLocalizer #%d") .build(); - return Executors.newFixedThreadPool(nThreads, tf); + return HadoopExecutors.newFixedThreadPool(nThreads, tf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java index cb11f99..16c36eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java @@ -22,7 +22,6 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -71,7 +71,7 @@ protected void serviceInit(Configuration conf) throws Exception { int threadCount = conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT, YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT); - uploaderPool = Executors.newFixedThreadPool(threadCount, + uploaderPool = HadoopExecutors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder(). setNameFormat("Shared cache uploader #%d"). build()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index f64685d..6411535 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -40,6 +39,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -113,7 +113,7 @@ public LogAggregationService(Dispatcher dispatcher, Context context, this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.threadPool = Executors.newCachedThreadPool( + this.threadPool = HadoopExecutors.newCachedThreadPool( new ThreadFactoryBuilder() .setNameFormat("LogAggregationService #%d") .build()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 471e994..d42a4e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -203,7 +204,7 @@ ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build(); sched = - new ScheduledThreadPoolExecutor(conf.getInt( + new HadoopScheduledThreadPoolExecutor(conf.getInt( YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT, YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf); return sched; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 9e6868d..0d85057 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -62,6 +62,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -1758,7 +1759,7 @@ public void testConcurrentAccessToSystemCredentials(){ final int NUM_THREADS = 10; final CountDownLatch allDone = new CountDownLatch(NUM_THREADS); - final ExecutorService threadPool = Executors.newFixedThreadPool( + final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool( NUM_THREADS); final AtomicBoolean stop = new AtomicBoolean(false); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7637410..7daccad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -34,9 +34,9 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -339,7 +339,7 @@ public void run() { final List exceptions = Collections.synchronizedList( new ArrayList()); - final ExecutorService threadPool = Executors.newFixedThreadPool( + final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool( testThreads); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java index 6748387..60fc3e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; @@ -80,7 +80,7 @@ protected void serviceInit(Configuration conf) throws Exception { // back-to-back runs ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); - scheduledExecutor = Executors.newScheduledThreadPool(2, tf); + scheduledExecutor = HadoopExecutors.newScheduledThreadPool(2, tf); super.serviceInit(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java index d2efb6a..54d736f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -43,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -116,7 +116,7 @@ protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore") .build(); - scheduler = Executors.newSingleThreadScheduledExecutor(tf); + scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf); super.serviceInit(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java index f934dbf..6d67ad3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java @@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker; @@ -121,7 +122,7 @@ public void testAddResourceConcurrency() throws Exception { startEmptyStore(); final String key = "key1"; int count = 5; - ExecutorService exec = Executors.newFixedThreadPool(count); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(count); List> futures = new ArrayList>(count); final CountDownLatch start = new CountDownLatch(1); for (int i = 0; i < count; i++) { @@ -197,7 +198,7 @@ public void testAddResourceRefConcurrency() throws Exception { // make concurrent addResourceRef calls (clients) int count = 5; - ExecutorService exec = Executors.newFixedThreadPool(count); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(count); List> futures = new ArrayList>(count); final CountDownLatch start = new CountDownLatch(1); for (int i = 0; i < count; i++) { @@ -235,7 +236,7 @@ public void testAddResourceRefAddResourceConcurrency() throws Exception { final String user = "user"; final ApplicationId id = createAppId(1, 1L); // add the resource and add the resource ref at the same time - ExecutorService exec = Executors.newFixedThreadPool(2); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(2); final CountDownLatch start = new CountDownLatch(1); Callable addKeyTask = new Callable() { public String call() throws Exception {