diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 18024d9..c704ef8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -53,6 +53,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -359,6 +360,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private volatile Optional configurationManager; + private long flushJitterTime; + + @VisibleForTesting + long getFlushJitterTime() { + return flushJitterTime; + } + /** * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every @@ -853,6 +861,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.lastStoreFlushTimeMap.put(store, lastFlushTime); } + if (this.flushCheckInterval > 0) + // Initialize the flushJitter time at region open time, this random value is fixed for each + // region. Therefore, the flush across all regions are uniformly distributed, even after a + // table flush operation. + flushJitterTime = ThreadLocalRandom.current().nextLong( + conf.getLong(MEMSTORE_PERIODIC_FLUSH_JITTER_RANGE, + DEFAULT_PERIODIC_MEMSTORE_FLUSH_JITTER_RANGE) + ); + // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId; @@ -1316,8 +1333,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Conf key for the periodic flush interval */ public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval"; + public static final String MEMSTORE_PERIODIC_FLUSH_JITTER_RANGE = + "hbase.regionserver.optionalcacheflush.jitter.range"; + /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + + /** Default interval for the memstore flush */ + public static final int DEFAULT_PERIODIC_MEMSTORE_FLUSH_JITTER_RANGE = 3600000; + /** Default interval for System tables memstore flush */ public static final int SYSTEM_CACHE_FLUSH_INTERVAL = 300000; // 5 minutes @@ -1974,7 +1998,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Should the store be flushed because it is old enough. *

* Every FlushPolicy should call this to determine whether a store is old enough to flush(except - * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always + * that you always flush all stores). Otherwise the {@link #shouldFlushStore()} method will always * returns true which will make a lot of flush requests. */ boolean shouldFlushStore(Store store) { @@ -1992,6 +2016,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return false; } long now = EnvironmentEdgeManager.currentTime(); + // the flushInterval has a randomized flushJitterTime added to it, so every region will be + // flushed at different time even the lastFlushtime are same for all regions + if (now - this.lastStoreFlushTimeMap.get(store) < flushCheckInterval + flushJitterTime) { + return false; + } if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { if (LOG.isDebugEnabled()) { LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + @@ -2015,16 +2044,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } long modifiedFlushCheckInterval = flushCheckInterval; + long modifiedJitterTimeRange = this.flushJitterTime; if (getRegionInfo().isSystemTable() && getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL; + modifiedJitterTimeRange = 0; } if (modifiedFlushCheckInterval <= 0) { //disabled return false; } long now = EnvironmentEdgeManager.currentTime(); //if we flushed in the recent past, we don't need to do again now - if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) { + if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval + modifiedJitterTimeRange)) { return false; } //since we didn't flush in the recent past, flush now if certain conditions @@ -2110,6 +2141,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new IOException("Aborting flush because server is aborted..."); } final long startTime = EnvironmentEdgeManager.currentTime(); + // If nothing to flush, return, but we need to safely update the region sequence id if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id @@ -7659,7 +7691,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClassSize.OBJECT + ClassSize.ARRAY + 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + - (14 * Bytes.SIZEOF_LONG) + + (15 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index c8d9ef4..0033d2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -915,6 +915,7 @@ public class TestDefaultMemStore extends TestCase { public void testShouldFlush() throws Exception { Configuration conf = new Configuration(); conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); + conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_JITTER_RANGE, 10000); checkShouldFlush(conf, true); // test disable flush conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0); @@ -937,7 +938,10 @@ public class TestDefaultMemStore extends TestCase { edge.setCurrentTimeMillis(1234 + 100); StringBuffer sb = new StringBuffer(); assertTrue(region.shouldFlush(sb) == false); - edge.setCurrentTimeMillis(1234 + 10000); + + edge.setCurrentTimeMillis(1234 + + conf.getInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000) + + region.getFlushJitterTime()); assertTrue(region.shouldFlush(sb) == expected); } finally { EnvironmentEdgeManager.reset();