diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index f7d4658..9221e6a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -91,12 +91,10 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -396,8 +394,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (batchPool == null) { synchronized (this) { if (batchPool == null) { - this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), - conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); + this.batchPool = getThreadPool( + conf.getInt("hbase.hconnection.threads.max", 256), "-shared-"); this.cleanupPool = true; } } @@ -405,31 +403,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.batchPool; } - private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, - BlockingQueue passedWorkQueue) { + private ExecutorService getThreadPool(int maxThreads, String nameHint) { // shared HTable thread executor not yet initialized if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } - long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - BlockingQueue workQueue = passedWorkQueue; - if (workQueue == null) { - workQueue = - new LinkedBlockingQueue(maxThreads * - conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); - } - ThreadPoolExecutor tpe = new ThreadPoolExecutor( - coreThreads, + + ForkJoinPool tpe = new ForkJoinPool( maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - Threads.newDaemonThreadFactory(toString() + nameHint)); - tpe.allowCoreThreadTimeOut(true); + Threads.newForkJoinWorkerThreadFactory(true, toString() + nameHint), null, true); return tpe; } @@ -437,14 +419,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (this.metaLookupPool == null) { synchronized (this) { if (this.metaLookupPool == null) { - //Some of the threads would be used for meta replicas - //To start with, threads.max.core threads can hit the meta (including replicas). - //After that, requests will get queued up in the passed queue, and only after - //the queue is full, a new thread will be started this.metaLookupPool = getThreadPool( - conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), - conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), - "-metaLookup-shared-", new LinkedBlockingQueue()); + conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), "-metaLookup-shared-"); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 46f8ec0..29a38ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -28,9 +28,8 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; @@ -67,13 +65,13 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescripto import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.util.Threads; /** * An implementation of {@link Table}. Used to communicate with a single HBase table. @@ -124,23 +122,16 @@ public class HTable implements HTableInterface { private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; + // Marked Private @since 1.0 @InterfaceAudience.Private - public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { + public static ExecutorService getDefaultExecutor(Configuration conf) { int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } - long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); - - // Using the "direct handoff" approach, new threads will only be created - // if it is necessary and will grow unbounded. This could be bad but in HCM - // we only create as many Runnables as there are region servers. It means - // it also scales when new region servers are added. - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, - new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); - pool.allowCoreThreadTimeOut(true); - return pool; + return new ForkJoinPool( + maxThreads, Threads.newForkJoinWorkerThreadFactory(false, "htable"), null, true); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index f34fb8a..6a973f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -25,7 +25,6 @@ import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.Timer; import com.yammer.metrics.reporting.JmxReporter; -import com.yammer.metrics.util.RatioGauge; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; @@ -33,7 +32,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; /** @@ -187,27 +186,6 @@ public class MetricsConnection { public MetricsConnection(final ConnectionImplementation conn) { this.scope = conn.toString(); this.registry = new MetricsRegistry(); - final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); - final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); - - this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope, - new RatioGauge() { - @Override protected double getNumerator() { - return batchPool.getActiveCount(); - } - @Override protected double getDenominator() { - return batchPool.getMaximumPoolSize(); - } - }); - this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope, - new RatioGauge() { - @Override protected double getNumerator() { - return metaPool.getActiveCount(); - } - @Override protected double getDenominator() { - return metaPool.getMaximumPoolSize(); - } - }); this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope); this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope); this.getTracker = new CallTracker(this.registry, "Get", scope); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index c366762..aada6d8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -23,6 +23,9 @@ import java.io.PrintWriter; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -186,7 +189,7 @@ public class Threads { * @return threadPoolExecutor the cachedThreadPool with a bounded number * as the maximum thread size in the pool. */ - public static ThreadPoolExecutor getBoundedCachedThreadPool( + public static ExecutorService getBoundedCachedThreadPool( int maxCachedThread, long timeout, TimeUnit unit, ThreadFactory threadFactory) { ThreadPoolExecutor boundedCachedThreadPool = @@ -261,6 +264,26 @@ public class Threads { }; } + /** + * Creates a ForkJoinWorkerThreadFactory with workers that can be deamon and with + * a name. + */ + public static ForkJoinPool.ForkJoinWorkerThreadFactory newForkJoinWorkerThreadFactory( + final boolean daemon, final String prefix) { + return new ForkJoinPool.ForkJoinWorkerThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + + public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread t = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + t.setName(prefix + " - " + threadNumber.incrementAndGet()); + if (daemon) { + t.setDaemon(true); + } + return t; + } + }; + } + /** Sets an UncaughtExceptionHandler for the thread which logs the * Exception stack if the thread dies. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java index 1f2ed0a..ca50b7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -106,7 +106,7 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler { LOG.info(msg); status.setStatus(msg); - ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot"); + ExecutorService exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot"); try { ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 38aa2c0..88ecbba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -1083,7 +1083,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // setup the default procedure coordinator String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); + java.util.concurrent.ExecutorService tpool = ProcedureCoordinator.defaultPool(name, opThreads); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index b7e0c04..8d10406 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -70,7 +70,7 @@ public class ProcedureCoordinator { * @param rpcs * @param pool Used for executing procedures. */ - public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) { + public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ExecutorService pool) { this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT); } @@ -84,7 +84,7 @@ public class ProcedureCoordinator { * @param pool Used for executing procedures. * @param timeoutMillis */ - public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, + public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ExecutorService pool, long timeoutMillis, long wakeTimeMillis) { this.timeoutMillis = timeoutMillis; this.wakeTimeMillis = wakeTimeMillis; @@ -99,7 +99,7 @@ public class ProcedureCoordinator { * @param coordName * @param opThreads the maximum number of threads to allow in the pool */ - public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) { + public static ExecutorService defaultPool(String coordName, int opThreads) { return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT); } @@ -110,7 +110,7 @@ public class ProcedureCoordinator { * @param opThreads the maximum number of threads to allow in the pool * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks */ - public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, + public static ExecutorService defaultPool(String coordName, int opThreads, long keepAliveMillis) { return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, new SynchronousQueue(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java index 485821e..9ae109f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java @@ -59,7 +59,7 @@ public class ProcedureMember implements Closeable { * @param pool thread pool to submit subprocedures * @param factory class that creates instances of a subprocedure. */ - public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool, + public ProcedureMember(ProcedureMemberRpcs rpcs, ExecutorService pool, SubprocedureFactory factory) { this.pool = pool; this.rpcs = rpcs; @@ -72,7 +72,7 @@ public class ProcedureMember implements Closeable { * @param memberName * @param procThreads the maximum number of threads to allow in the pool */ - public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) { + public static ExecutorService defaultPool(String memberName, int procThreads) { return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT); } @@ -83,7 +83,7 @@ public class ProcedureMember implements Closeable { * @param procThreads the maximum number of threads to allow in the pool * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks */ - public static ThreadPoolExecutor defaultPool(String memberName, int procThreads, + public static ExecutorService defaultPool(String memberName, int procThreads, long keepAliveMillis) { return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS, new SynchronousQueue(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index e72da2a..d877880 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,7 +98,7 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { // setup the procedure coordinator String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads); + ExecutorService tpool = ProcedureCoordinator.defaultPool(name, threads); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( master.getZooKeeper(), getProcedureSignature(), name); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 1aa959c..8b89c53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -328,7 +329,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); // create the actual flush table procedure member - ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), + ExecutorService pool = ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 04adf25..be26545 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -358,7 +359,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // We assume that most compactions are small. So, put system compactions into small // pool; we will do selection there, and move to large pool if necessary. - ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) + ExecutorService pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) ? longCompactions : shortCompactions; pool.execute(new CompactionRunner(s, r, compaction, pool, user)); if (LOG.isDebugEnabled()) { @@ -463,11 +464,11 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private final HRegion region; private CompactionContext compaction; private int queuedPriority; - private ThreadPoolExecutor parent; + private ExecutorService parent; private User user; public CompactionRunner(Store store, Region region, - CompactionContext compaction, ThreadPoolExecutor parent, User user) { + CompactionContext compaction, ExecutorService parent, User user) { super(); this.store = store; this.region = (HRegion)region; @@ -506,7 +507,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // Now see if we are in correct pool for the size; if not, go to the correct one. // We might end up waiting for a while, so cancel the selection. assert this.compaction.hasSelection(); - ThreadPoolExecutor pool = store.throttleCompaction( + ExecutorService pool = store.throttleCompaction( compaction.getRequest().getSize()) ? longCompactions : shortCompactions; // Long compaction pool can process small job diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b6cdd29..13c0a81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -895,7 +895,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!htableDescriptor.getFamilies().isEmpty()) { // initialize the thread pool for opening stores in parallel. - ThreadPoolExecutor storeOpenerThreadPool = + ExecutorService storeOpenerThreadPool = getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog()); CompletionService completionService = new ExecutorCompletionService(storeOpenerThreadPool); @@ -1440,7 +1440,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new TreeMap>(Bytes.BYTES_COMPARATOR); if (!stores.isEmpty()) { // initialize the thread pool for closing stores in parallel. - ThreadPoolExecutor storeCloserThreadPool = + ExecutorService storeCloserThreadPool = getStoreOpenAndCloseThreadPool("StoreCloserThread-" + getRegionInfo().getRegionNameAsString()); CompletionService>> completionService = @@ -1545,7 +1545,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( + protected ExecutorService getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); int maxThreads = Math.min(numStores, @@ -1554,7 +1554,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); } - protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( + protected ExecutorService getStoreFileOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); int maxThreads = Math.max(1, @@ -1564,7 +1564,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); } - static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, + static ExecutorService getOpenAndCloseThreadPool(int maxThreads, final String threadNamePrefix) { return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cfda1c6..dc944dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -37,6 +37,7 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -525,7 +526,7 @@ public class HStore implements Store { return new ArrayList(); } // initialize the thread pool for opening store files in parallel.. - ThreadPoolExecutor storeFileOpenerThreadPool = + ExecutorService storeFileOpenerThreadPool = this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + this.getColumnFamilyName()); CompletionService completionService = @@ -851,7 +852,7 @@ public class HStore implements Store { if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. - ThreadPoolExecutor storeFileCloserThreadPool = this.region + ExecutorService storeFileCloserThreadPool = this.region .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" + this.getColumnFamilyName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java index e08cf0e..e35e150 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -402,7 +403,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager { int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); // create the actual snapshot procedure member - ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), + ExecutorService pool = ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 4c719a9..ea5a59a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -80,7 +81,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Handles connecting to peer region servers private ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; - private ThreadPoolExecutor exec; + private ExecutorService exec; private int maxThreads; @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java index 81e653d..48e9ba0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; @@ -166,7 +167,7 @@ public class RestoreSnapshotHelper { * @return the set of regions touched by the restore operation */ public RestoreMetaChanges restoreHdfsRegions() throws IOException { - ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot"); + ExecutorService exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot"); try { return restoreHdfsRegions(exec); } finally { @@ -174,7 +175,7 @@ public class RestoreSnapshotHelper { } } - private RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException { + private RestoreMetaChanges restoreHdfsRegions(final ExecutorService exec) throws IOException { LOG.debug("starting restore"); Map regionManifests = snapshotManifest.getRegionManifestsMap(); @@ -381,7 +382,7 @@ public class RestoreSnapshotHelper { /** * Remove specified regions from the file-system, using the archiver. */ - private void removeHdfsRegions(final ThreadPoolExecutor exec, final List regions) + private void removeHdfsRegions(final ExecutorService exec, final List regions) throws IOException { if (regions == null || regions.size() == 0) return; ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { @@ -395,7 +396,7 @@ public class RestoreSnapshotHelper { /** * Restore specified regions by restoring content to the snapshot state. */ - private void restoreHdfsRegions(final ThreadPoolExecutor exec, + private void restoreHdfsRegions(final ExecutorService exec, final Map regionManifests, final List regions) throws IOException { if (regions == null || regions.size() == 0) return; @@ -410,7 +411,7 @@ public class RestoreSnapshotHelper { /** * Restore specified mob regions by restoring content to the snapshot state. */ - private void restoreHdfsMobRegions(final ThreadPoolExecutor exec, + private void restoreHdfsMobRegions(final ExecutorService exec, final Map regionManifests, final List regions) throws IOException { if (regions == null || regions.size() == 0) return; @@ -544,7 +545,7 @@ public class RestoreSnapshotHelper { * Clone specified regions. For each region create a new region * and create a HFileLink for each hfile. */ - private HRegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec, + private HRegionInfo[] cloneHdfsRegions(final ExecutorService exec, final Map regionManifests, final List regions) throws IOException { if (regions == null || regions.size() == 0) return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index ca004db..caea0fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -347,7 +348,7 @@ public final class SnapshotManifest { case SnapshotManifestV1.DESCRIPTOR_VERSION: { this.htd = FSTableDescriptors.getTableDescriptorFromFs(fs, workingDir) .getHTableDescriptor(); - ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader"); + ExecutorService tpool = createExecutor("SnapshotManifestLoader"); try { this.regionManifests = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc); @@ -365,7 +366,7 @@ public final class SnapshotManifest { // Compatibility, load the v1 regions // This happens only when the snapshot is in-progress and the cache wants to refresh. List v1Regions, v2Regions; - ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader"); + ExecutorService tpool = createExecutor("SnapshotManifestLoader"); try { v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc); v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc); @@ -455,7 +456,7 @@ public final class SnapshotManifest { private void convertToV2SingleManifest() throws IOException { // Try to load v1 and v2 regions List v1Regions, v2Regions; - ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader"); + ExecutorService tpool = createExecutor("SnapshotManifestLoader"); try { v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc); v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc); @@ -526,11 +527,11 @@ public final class SnapshotManifest { } } - private ThreadPoolExecutor createExecutor(final String name) { + private ExecutorService createExecutor(final String name) { return createExecutor(conf, name); } - public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) { + public static ExecutorService createExecutor(final Configuration conf, final String name) { int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8); return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, Threads.getNamedThreadFactory(name)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 18c86c8..da83e9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index a936fc2..13d37aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -28,8 +28,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -101,7 +101,7 @@ public abstract class ModifyRegionUtils { final RegionFillTask task) throws IOException { if (newRegions == null) return null; int regionNumber = newRegions.length; - ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf, + ExecutorService exec = getRegionOpenAndInitThreadPool(conf, "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber); try { return createRegions(exec, conf, rootDir, hTableDescriptor, newRegions, task); @@ -122,7 +122,7 @@ public abstract class ModifyRegionUtils { * @param task {@link RegionFillTask} custom code to populate region after creation * @throws IOException */ - public static List createRegions(final ThreadPoolExecutor exec, + public static List createRegions(final ExecutorService exec, final Configuration conf, final Path rootDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, final RegionFillTask task) throws IOException { @@ -191,7 +191,7 @@ public abstract class ModifyRegionUtils { * @param task {@link RegionFillTask} custom code to edit the region * @throws IOException */ - public static void editRegions(final ThreadPoolExecutor exec, + public static void editRegions(final ExecutorService exec, final Collection regions, final RegionEditTask task) throws IOException { final ExecutorCompletionService completionService = new ExecutorCompletionService(exec); @@ -222,11 +222,11 @@ public abstract class ModifyRegionUtils { * used by createRegions() to get the thread pool executor based on the * "hbase.hregion.open.and.init.threads.max" property. */ - static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, + static ExecutorService getRegionOpenAndInitThreadPool(final Configuration conf, final String threadNamePrefix, int regionNumber) { int maxThreads = Math.min(regionNumber, conf.getInt( "hbase.hregion.open.and.init.threads.max", 10)); - ThreadPoolExecutor regionOpenAndInitThreadPool = Threads + ExecutorService regionOpenAndInitThreadPool = Threads .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { private int count = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 3741cdf..0a48e39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -1305,7 +1306,7 @@ public class WALSplitter { final List paths = new ArrayList(); final List thrown = Lists.newArrayList(); - ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, + ExecutorService closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { private int count = 1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 63b60bb..dd340a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -28,7 +28,9 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.exceptions.OperationConflictException; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -146,7 +149,8 @@ public class TestMultiParallel { @Test(timeout=300000) public void testActiveThreadsCount() throws Exception { try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { - ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); + ThreadPoolExecutor executor = + new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, new LinkedBlockingQueue()); try { try (Table t = connection.getTable(TEST_TABLE, executor)) { List puts = constructPutRequests(); // creates a Put for every region diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java index 62afaa9..ebce1b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +63,7 @@ public class SimpleMasterProcedureManager extends MasterProcedureManager { // setup the default procedure coordinator String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + ExecutorService tpool = ProcedureCoordinator.defaultPool(name, 1); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( master.getZooKeeper(), getProcedureSignature(), name); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java index 7620bbb..b4f0a51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -54,7 +55,7 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager { ZooKeeperWatcher zkw = rss.getZooKeeper(); this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature()); - ThreadPoolExecutor pool = + ExecutorService pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 1); this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder()); LOG.info("Initialized: " + rss.getServerName().toString()); @@ -117,7 +118,7 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager { public class SimpleSubprocedurePool implements Closeable, Abortable { private final ExecutorCompletionService taskPool; - private final ThreadPoolExecutor executor; + private final ExecutorService executor; private volatile boolean aborted; private final List> futures = new ArrayList>(); private final String name; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java index 710e631..df4198c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java @@ -38,6 +38,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -88,7 +89,7 @@ public class TestProcedureCoordinator { } private ProcedureCoordinator buildNewCoordinator() { - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); + ExecutorService pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); return spy(new ProcedureCoordinator(controller, pool)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java index 2d7a68f..5a8dc09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -88,7 +89,7 @@ public class TestProcedureMember { */ private ProcedureMember buildCohortMember() { String name = "node"; - ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); + ExecutorService pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); return new ProcedureMember(mockMemberComms, pool, mockBuilder); } @@ -98,7 +99,7 @@ public class TestProcedureMember { private void buildCohortMemberPair() throws IOException { dispatcher = new ForeignExceptionDispatcher(); String name = "node"; - ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); + ExecutorService pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); member = new ProcedureMember(mockMemberComms, pool, mockBuilder); when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception Subprocedure subproc = new EmptySubprocedure(member, dispatcher); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java index 211e9e6..87f3cd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java @@ -31,7 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -129,7 +129,7 @@ public class TestZKProcedure { // start running the controller ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs( coordZkw, opDescription, COORDINATOR_NODE_NAME); - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); + ExecutorService pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { @Override public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, @@ -147,7 +147,7 @@ public class TestZKProcedure { for (String member : members) { ZooKeeperWatcher watcher = newZooKeeperWatcher(); ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription); - ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); + ExecutorService pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory); procMembers.add(new Pair(procMember, comms)); comms.start(member, procMember); @@ -211,7 +211,7 @@ public class TestZKProcedure { ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher(); ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs( coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); + ExecutorService pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); // start a member for each node @@ -221,7 +221,7 @@ public class TestZKProcedure { for (String member : expected) { ZooKeeperWatcher watcher = newZooKeeperWatcher(); ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription); - ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); + ExecutorService pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory); members.add(new Pair(mem, controller)); controller.start(member, mem);