From 2ea2f29686dc6ca2a272a8edab9192ef7dacb56e Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 3 Dec 2015 10:40:40 -0800 Subject: [PATCH] HBASE-14922 Delayed flush doesn't work causing flush storms. --- .../java/org/apache/hadoop/hbase/ChoreService.java | 20 ++++- .../JitterScheduledThreadPoolExecutorImpl.java | 92 ++++++++++++++++++++++ .../org/apache/hadoop/hbase/TestChoreService.java | 8 +- .../hadoop/hbase/regionserver/HRegionServer.java | 6 +- 4 files changed, 115 insertions(+), 11 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java index 2519f8f..6a7f8f2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -91,7 +91,14 @@ public class ChoreService implements ChoreServicer { * spawned by this service */ public ChoreService(final String coreThreadPoolPrefix) { - this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE); + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); + } + + /** + * @param jitter Should the + */ + public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); } /** @@ -101,11 +108,16 @@ public class ChoreService implements ChoreServicer { * to during initialization. The default size is 1, but specifying a larger size may be * beneficial if you know that 1 thread will not be enough. */ - public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) { + public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { this.coreThreadPoolPrefix = coreThreadPoolPrefix; if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; - final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); - scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); + if (jitter) { + scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); + } else { + scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + scheduler.setRemoveOnCancelPolicy(true); scheduledChores = new HashMap>(); choresMissingStartTime = new HashMap(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java new file mode 100644 index 0000000..9aa84b2 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java @@ -0,0 +1,92 @@ +package org.apache.hadoop.hbase; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor { + private final double spread; + + public JitterScheduledThreadPoolExecutorImpl(int corePoolSize, + ThreadFactory threadFactory, + double spread) { + super(corePoolSize, threadFactory); + this.spread = spread; + } + + protected java.util.concurrent.RunnableScheduledFuture decorateTask( + Runnable runnable, java.util.concurrent.RunnableScheduledFuture task) { + return new JitteredRunnableScheduledFuture<>(task); + } + + + protected java.util.concurrent.RunnableScheduledFuture decorateTask( + Callable callable, java.util.concurrent.RunnableScheduledFuture task) { + return new JitteredRunnableScheduledFuture<>(task); + } + + protected class JitteredRunnableScheduledFuture implements RunnableScheduledFuture { + + private final RunnableScheduledFuture wrapped; + + JitteredRunnableScheduledFuture(RunnableScheduledFuture wrapped) { + this.wrapped = wrapped; + } + + @Override + public boolean isPeriodic() { + return wrapped.isPeriodic(); + } + + @Override + public long getDelay(TimeUnit unit) { + long delay = wrapped.getDelay(unit); + long spreadTime = (long) (delay * spread); + delay += ThreadLocalRandom.current().nextLong(-spreadTime,spreadTime); + return delay; + } + + @Override + public int compareTo(Delayed o) { + return wrapped.compareTo(o); + } + + @Override + public void run() { + wrapped.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return wrapped.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return wrapped.isCancelled(); + } + + @Override + public boolean isDone() { + return wrapped.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return wrapped.get(); + } + + @Override + public V get(long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return wrapped.get(timeout, unit); + } + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java index b113174..c818310 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -315,7 +315,7 @@ public class TestChoreService { final int corePoolSize = 10; final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; - ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize); + ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); try { assertEquals(corePoolSize, customInit.getCorePoolSize()); } finally { @@ -329,7 +329,7 @@ public class TestChoreService { shutdownService(defaultInit); } - ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10); + ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false); try { assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); } finally { @@ -403,7 +403,7 @@ public class TestChoreService { @Test (timeout=20000) public void testCorePoolIncrease() throws InterruptedException { final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize); + ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false); try { assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, @@ -443,7 +443,7 @@ public class TestChoreService { @Test(timeout = 30000) public void testCorePoolDecrease() throws InterruptedException { final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize); + ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false); final int chorePeriod = 100; try { // Slow chores always miss their start time and thus the core pool size should be at least as diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5be240d..a702f6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -605,7 +605,7 @@ public class HRegionServer extends HasThread implements rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); - this.choreService = new ChoreService(getServerName().toString()); + this.choreService = new ChoreService(getServerName().toString(), true); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @@ -1574,8 +1574,8 @@ public class HRegionServer extends HasThread implements static class PeriodicMemstoreFlusher extends ScheduledChore { final HRegionServer server; - final static int RANGE_OF_DELAY = 20000; //millisec - final static int MIN_DELAY_TIME = 3000; //millisec + final static int RANGE_OF_DELAY = 200000; // millisec + final static int MIN_DELAY_TIME = 0; // millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; -- 2.6.3