From 1885db76c88625b54bb5ba6ad3860f3f859b7cf0 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 3 Dec 2015 10:40:40 -0800 Subject: [PATCH] HBASE-14922 Delayed flush doesn't work causing flush storms. --- .../java/org/apache/hadoop/hbase/ChoreService.java | 20 +++- .../JitterScheduledThreadPoolExecutorImpl.java | 123 +++++++++++++++++++++ .../org/apache/hadoop/hbase/TestChoreService.java | 8 +- .../hadoop/hbase/regionserver/HRegionServer.java | 6 +- 4 files changed, 146 insertions(+), 11 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java index 2519f8f..6a7f8f2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -91,7 +91,14 @@ public class ChoreService implements ChoreServicer { * spawned by this service */ public ChoreService(final String coreThreadPoolPrefix) { - this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE); + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); + } + + /** + * @param jitter Should the + */ + public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); } /** @@ -101,11 +108,16 @@ public class ChoreService implements ChoreServicer { * to during initialization. The default size is 1, but specifying a larger size may be * beneficial if you know that 1 thread will not be enough. */ - public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) { + public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { this.coreThreadPoolPrefix = coreThreadPoolPrefix; if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; - final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); - scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); + if (jitter) { + scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); + } else { + scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + scheduler.setRemoveOnCancelPolicy(true); scheduledChores = new HashMap>(); choresMissingStartTime = new HashMap(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java new file mode 100644 index 0000000..95efa5a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/JitterScheduledThreadPoolExecutorImpl.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay. + * + * This will spread out things on a distributed cluster. + */ +public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor { + private final double spread; + + /** + * Main constructor. + * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered. + */ + public JitterScheduledThreadPoolExecutorImpl(int corePoolSize, + ThreadFactory threadFactory, + double spread) { + super(corePoolSize, threadFactory); + this.spread = spread; + } + + protected java.util.concurrent.RunnableScheduledFuture decorateTask( + Runnable runnable, java.util.concurrent.RunnableScheduledFuture task) { + return new JitteredRunnableScheduledFuture<>(task); + } + + + protected java.util.concurrent.RunnableScheduledFuture decorateTask( + Callable callable, java.util.concurrent.RunnableScheduledFuture task) { + return new JitteredRunnableScheduledFuture<>(task); + } + + /** + * Class that basically just defers to the wrapped future. + * The only exception is getDelay + */ + protected class JitteredRunnableScheduledFuture implements RunnableScheduledFuture { + private final RunnableScheduledFuture wrapped; + JitteredRunnableScheduledFuture(RunnableScheduledFuture wrapped) { + this.wrapped = wrapped; + } + + @Override + public boolean isPeriodic() { + return wrapped.isPeriodic(); + } + + @Override + public long getDelay(TimeUnit unit) { + long baseDelay = wrapped.getDelay(unit); + long spreadTime = (long) (baseDelay * spread); + long delay = baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime); + // Ensure that we don't roll over for nanoseconds. + return (delay < 0) ? baseDelay : delay; + } + + @Override + public int compareTo(Delayed o) { + return wrapped.compareTo(o); + } + + @Override + public void run() { + wrapped.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return wrapped.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return wrapped.isCancelled(); + } + + @Override + public boolean isDone() { + return wrapped.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return wrapped.get(); + } + + @Override + public V get(long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return wrapped.get(timeout, unit); + } + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java index b113174..c818310 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -315,7 +315,7 @@ public class TestChoreService { final int corePoolSize = 10; final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; - ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize); + ChoreService customInit = new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); try { assertEquals(corePoolSize, customInit.getCorePoolSize()); } finally { @@ -329,7 +329,7 @@ public class TestChoreService { shutdownService(defaultInit); } - ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10); + ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false); try { assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); } finally { @@ -403,7 +403,7 @@ public class TestChoreService { @Test (timeout=20000) public void testCorePoolIncrease() throws InterruptedException { final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize); + ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false); try { assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, @@ -443,7 +443,7 @@ public class TestChoreService { @Test(timeout = 30000) public void testCorePoolDecrease() throws InterruptedException { final int initialCorePoolSize = 3; - ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize); + ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false); final int chorePeriod = 100; try { // Slow chores always miss their start time and thus the core pool size should be at least as diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5be240d..2bc5eaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -605,7 +605,7 @@ public class HRegionServer extends HasThread implements rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); - this.choreService = new ChoreService(getServerName().toString()); + this.choreService = new ChoreService(getServerName().toString(), true); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @@ -1574,8 +1574,8 @@ public class HRegionServer extends HasThread implements static class PeriodicMemstoreFlusher extends ScheduledChore { final HRegionServer server; - final static int RANGE_OF_DELAY = 20000; //millisec - final static int MIN_DELAY_TIME = 3000; //millisec + final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds + final static int MIN_DELAY_TIME = 0; // millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; -- 2.6.3