diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4c34fe0..eeb9f80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2024,6 +2024,7 @@ public class HRegionServer extends HasThread implements // Registering the compactSplitThread object with the ConfigurationManager. configurationManager.registerObserver(this.compactSplitThread); configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this.cacheFlusher); configurationManager.registerObserver(this); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index a314848..904f82b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -43,6 +43,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -66,9 +68,15 @@ import org.apache.yetus.audience.InterfaceAudience; * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher implements FlushRequester { +class MemStoreFlusher implements FlushRequester, PropagatingConfigurationObserver { private static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); + // Configuration keys for flush threads + public final static String FLUSHER_THREADS = "hbase.hstore.flusher.count"; + public final static int FLUSHER_THREADS_DEFAULT = 2; + public final static String FLUSHER_THREADS_MAX = "hbase.hstore.flusher.count.max"; + public final static int FLUSHER_THREADS_MAX_DEFAULT = 8; + private Configuration conf; // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -84,6 +92,8 @@ class MemStoreFlusher implements FlushRequester { private long blockingWaitTime; private final LongAdder updatesBlockedMsHighWater = new LongAdder(); + private final int maxHandlerCount; + private volatile int handlerCount = FLUSHER_THREADS_DEFAULT; private final FlushHandler[] flushHandlers; private List flushRequestListeners = new ArrayList<>(1); @@ -100,7 +110,8 @@ class MemStoreFlusher implements FlushRequester { conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); - int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); + this.maxHandlerCount = conf.getInt(FLUSHER_THREADS_MAX, FLUSHER_THREADS_MAX_DEFAULT); + this.handlerCount = conf.getInt(FLUSHER_THREADS, FLUSHER_THREADS_DEFAULT); this.flushHandlers = new FlushHandler[handlerCount]; LOG.info("globalMemStoreLimit=" + TraditionalBinaryPrefix @@ -213,8 +224,11 @@ class MemStoreFlusher implements FlushRequester { private class FlushHandler extends HasThread { - private FlushHandler(String name) { + int id; + + private FlushHandler(String name, int id) { super(name); + this.id = id; } @Override @@ -222,6 +236,10 @@ class MemStoreFlusher implements FlushRequester { while (!server.isStopped()) { FlushQueueEntry fqe = null; try { + if (id >= handlerCount) { + Thread.sleep(1000); + continue; + } wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (fqe == null || fqe instanceof WakeupFlushThread) { @@ -398,7 +416,7 @@ class MemStoreFlusher implements FlushRequester { ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( server.getServerName().toShortString() + "-MemStoreFlusher", eh); for (int i = 0; i < flushHandlers.length; i++) { - flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); + flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i, i); flusherThreadFactory.newThread(flushHandlers[i]); flushHandlers[i].start(); } @@ -697,6 +715,51 @@ class MemStoreFlusher implements FlushRequester { reclaimMemStoreMemory(); } + protected int getFlusherThreadNum() { + return handlerCount; + } + + /** + * {@inheritDoc} + */ + @Override + public void onConfigurationChange(Configuration newConf) { + int newHandlerCount = newConf.getInt(FLUSHER_THREADS, FLUSHER_THREADS_DEFAULT); + if (newHandlerCount < 1) { + LOG.warn("Min flush threads is 1, newHandlerCount=" + newHandlerCount + " adjust to 1"); + newHandlerCount = 1; + } + if (newHandlerCount > maxHandlerCount) { + LOG.warn("Max flush threads is " + maxHandlerCount + ", newHandlerCount=" + + newHandlerCount + " adjust to " + maxHandlerCount); + newHandlerCount = maxHandlerCount; + } + if (newHandlerCount != handlerCount) { + LOG.info("Changing the value of " + FLUSHER_THREADS + " from " + handlerCount + " to " + + newHandlerCount); + handlerCount = newHandlerCount; + } + // We change this atomically here instead of reloading the config in order that upstream + // would be the only one with the flexibility to reload the config. + this.conf.reloadConfiguration(); + } + + /** + * {@inheritDoc} + */ + @Override + public void registerChildren(ConfigurationManager manager) { + // No children to register. + } + + /** + * {@inheritDoc} + */ + @Override + public void deregisterChildren(ConfigurationManager manager) { + // No children to register + } + interface FlushQueueEntry extends Delayed { }