Index: src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java (revision 0)
@@ -0,0 +1,126 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
+import org.junit.Test;
+
+public class TestExecutorService {
+
+ @Test
+ public void testExecutorService() throws Exception {
+
+ int maxThreads = 5;
+ int maxTries = 10;
+ int sleepInterval = 10;
+
+ // Start an executor service pool with max 5 threads
+ ExecutorService executorService = new ExecutorService("unit_test");
+ executorService.startExecutorService(
+ ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
+
+ Executor executor =
+ executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
+ ThreadPoolExecutor pool = executor.threadPoolExecutor;
+
+ assertEquals(0, pool.getPoolSize());
+
+ AtomicBoolean lock = new AtomicBoolean(true);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ for (int i = 0; i < maxThreads; i++) {
+ executorService.submit(
+ new TestEventHandler(EventType.M_SERVER_SHUTDOWN, lock, counter));
+ }
+
+ int tries = 0;
+ while (counter.get() < maxThreads && tries < maxTries) {
+ System.out.println("Waiting for all event handlers to start...");
+ Thread.sleep(sleepInterval);
+ tries++;
+ }
+
+ assertEquals(maxThreads, counter.get());
+ assertEquals(maxThreads, pool.getPoolSize());
+
+ synchronized (lock) {
+ lock.set(false);
+ lock.notifyAll();
+ }
+
+ while (counter.get() < (maxThreads * 2) && tries < maxTries) {
+ System.out.println("Waiting for all event handlers to finish...");
+ Thread.sleep(sleepInterval);
+ tries++;
+ }
+
+ assertEquals(maxThreads*2, counter.get());
+
+ assertEquals(maxThreads, pool.getPoolSize());
+
+ // Add too many. Make sure they are queued. Make sure we don't get
+ // RejectedExecutionException.
+ for (int i = 0; i < maxThreads; i++) {
+ executorService.submit(
+ new TestEventHandler(EventType.M_SERVER_SHUTDOWN, lock, counter));
+ }
+
+ Thread.sleep(executor.keepAliveTimeInMillis * 2);
+
+ assertEquals(pool.getPoolSize(), pool.getPoolSize());
+ }
+
+ public static class TestEventHandler extends EventHandler {
+ private AtomicBoolean lock;
+ private AtomicInteger counter;
+
+ public TestEventHandler(EventType eventType, AtomicBoolean lock,
+ AtomicInteger counter) {
+ super(null, eventType);
+ this.lock = lock;
+ this.counter = counter;
+ }
+
+ @Override
+ public void process() throws IOException {
+ int num = counter.incrementAndGet();
+ System.out.println("Running process #" + num);
+ synchronized (lock) {
+ while (lock.get()) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ }
+ }
+ counter.incrementAndGet();
+ }
+ }
+}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1026490)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -1131,13 +1131,13 @@
// Start executor services
this.service = new ExecutorService(getServerName());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
- conf.getInt("hbase.regionserver.executor.openregion.threads", 5));
+ conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
- conf.getInt("hbase.regionserver.executor.closeregion.threads", 5));
+ conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1026490)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -497,13 +497,13 @@
try {
// Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
- conf.getInt("hbase.master.executor.openregion.threads", 10));
+ conf.getInt("hbase.master.executor.openregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
- conf.getInt("hbase.master.executor.closeregion.threads", 10));
+ conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
- conf.getInt("hbase.master.executor.serverops.threads", 5));
+ conf.getInt("hbase.master.executor.serverops.threads", 3));
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS,
- conf.getInt("hbase.master.executor.tableops.threads", 5));
+ conf.getInt("hbase.master.executor.tableops.threads", 3));
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
Index: src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java (revision 1026490)
+++ src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java (working copy)
@@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -41,7 +40,7 @@
* threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
* and a Runnable that handles the object that is added to the queue.
*
- *
In order to create a new service, create an instance of this class and + *
In order to create a new service, create an instance of this class and
* then do: instance.startExecutorService("myService");. When done
* call {@link #shutdown()}.
*
@@ -243,15 +242,14 @@
/**
* Executor instance.
*/
- private static class Executor {
- // default number of threads in the pool
- private int corePoolSize = 1;
+ static class Executor {
// how long to retain excess threads
- private long keepAliveTimeInMillis = 1000;
+ final long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests
- private final ThreadPoolExecutor threadPoolExecutor;
+ final ThreadPoolExecutor threadPoolExecutor;
// work queue to use - unbounded queue
- BlockingQueue