From a149b9020cb382d6524ce7fc483b6ce04185ebf1 Mon Sep 17 00:00:00 2001 From: Changgeng Li Date: Thu, 28 May 2015 22:57:07 -0700 Subject: [PATCH] large compaction thread pool will steal jobs from small compaction pool when idle --- .../hbase/regionserver/CompactSplitThread.java | 6 +- .../hadoop/hbase/regionserver/StealJobQueue.java | 102 ++++++++++ .../hbase/regionserver/TestStealJobQueue.java | 211 +++++++++++++++++++++ 3 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StealJobQueue.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStealJobQueue.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 1c88eb0..0972e5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -117,8 +117,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi final String n = Thread.currentThread().getName(); + StealJobQueue stealJobQueue = new StealJobQueue<>(); this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), + 60, TimeUnit.SECONDS, stealJobQueue, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -128,8 +129,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } }); this.longCompactions.setRejectedExecutionHandler(new Rejection()); + this.longCompactions.prestartAllCoreThreads(); this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), + 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StealJobQueue.java new file mode 100644 index 0000000..098b597 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StealJobQueue.java @@ -0,0 +1,102 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * + * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor. + * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the steal-from + * ThreadPoolExecutor. The behavior of this queue is the same as a normal PriorityBlockingQueue except + * the take/pool(long,TimeUnit) methods would also check whether there are jobs in the steal-from queue if this queue is + * empty. + * + * Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the other queue, + * otherwise the worker will only be started after there are jobs submitted to main queue. + */ +public class StealJobQueue extends PriorityBlockingQueue { + + private BlockingQueue stealFromQueue; + + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + + public StealJobQueue() { + this.stealFromQueue = new PriorityBlockingQueue() { + @Override + public boolean offer(T t) { + lock.lock(); + try { + notEmpty.signal(); + return super.offer(t); + } finally { + lock.unlock(); + } + } + }; + } + + public BlockingQueue getStealFromQueue() { + return stealFromQueue; + } + + @Override + public boolean offer(T t) { + lock.lock(); + try { + notEmpty.signal(); + return super.offer(t); + } finally { + lock.unlock(); + } + } + + + @Override + public T take() throws InterruptedException { + lock.lockInterruptibly(); + try { + while (true) { + T retVal = this.poll(); + if (retVal == null) { + retVal = stealFromQueue.poll(); + } + if (retVal == null) { + notEmpty.await(); + } else { + return retVal; + } + } + } finally { + lock.unlock(); + } + } + + @Override + public T poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (true) { + T retVal = this.poll(); + if (retVal == null) { + retVal = stealFromQueue.poll(); + } + if (retVal == null) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } else { + return retVal; + } + } + } finally { + lock.unlock(); + } + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStealJobQueue.java new file mode 100644 index 0000000..2b46ff1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStealJobQueue.java @@ -0,0 +1,211 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +/** + * Created by changgeng on 5/28/15. + */ + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestStealJobQueue { + + StealJobQueue stealJobQueue; + BlockingQueue stealFromQueue; + + @Before + public void setup() { + stealJobQueue = new StealJobQueue<>(); + stealFromQueue = stealJobQueue.getStealFromQueue(); + + } + + + @Test + public void testTake() throws InterruptedException { + stealJobQueue.offer(3); + stealFromQueue.offer(10); + stealJobQueue.offer(15); + stealJobQueue.offer(4); + assertEquals(3, stealJobQueue.take().intValue()); + assertEquals(4, stealJobQueue.take().intValue()); + assertEquals("always take from the main queue before trying to steal", 15, stealJobQueue.take().intValue()); + assertEquals(10, stealJobQueue.take().intValue()); + assertTrue(stealFromQueue.isEmpty()); + assertTrue(stealJobQueue.isEmpty()); + } + + @Test + public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.take(); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealFromQueue.offer(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + } + + + @Test + public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.take(); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealJobQueue.offer(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + } + + + @Test + public void testPoll() throws InterruptedException { + stealJobQueue.offer(3); + stealFromQueue.offer(10); + stealJobQueue.offer(15); + stealJobQueue.offer(4); + assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertEquals("always take from the main queue before trying to steal", 15, stealJobQueue.poll(1, TimeUnit.SECONDS) + .intValue()); + assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); + assertTrue(stealFromQueue.isEmpty()); + assertTrue(stealJobQueue.isEmpty()); + assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS)); + } + + @Test + public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealFromQueue.put(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + + } + + + @Test + public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException { + final AtomicInteger taken = new AtomicInteger(); + Thread consumer = new Thread() { + @Override + public void run() { + try { + Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); + taken.set(n); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + consumer.start(); + stealJobQueue.add(3); + consumer.join(1000); + assertEquals(3, taken.get()); + consumer.interrupt(); //Ensure the consumer thread will stop. + } + + + @Test + public void testInteractWithThreadPool() throws InterruptedException { + StealJobQueue stealTasksQueue = new StealJobQueue<>(); + final CountDownLatch stealJobCountDown = new CountDownLatch(3); + final CountDownLatch stealFromCountDown = new CountDownLatch(3); + ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + stealJobCountDown.countDown(); + } + + }; + + //This is necessary otherwise no worker will be running and stealing job + stealPool.prestartAllCoreThreads(); + + ThreadPoolExecutor stealFromPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue.getStealFromQueue()) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + stealFromCountDown.countDown(); + } + }; + + for (int i = 0; i < 4; i++) { + TestTask task = new TestTask(); + stealFromPool.execute(task); + } + + for(int i = 0;i<2;i++){ + TestTask task = new TestTask(); + stealPool.execute(task); + } + + stealJobCountDown.await(1, TimeUnit.SECONDS); + stealFromCountDown.await(1, TimeUnit.SECONDS); + assertEquals(0, stealFromCountDown.getCount()); + assertEquals(0, stealJobCountDown.getCount()); + } + + class TestTask extends Thread implements Comparable { + @Override + public int compareTo(TestTask o) { + return 0; + } + + @Override + public void run() { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +} \ No newline at end of file -- 1.9.5