diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index 85cc8a5..6e6039d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -32,4 +32,11 @@ public interface FlushRequester { * @param region the HRegion requesting the cache flush */ void requestFlush(HRegion region); + /** + * Tell the listener the cache needs to be flushed after a delay + * + * @param region the HRegion requesting the cache flush + * @param delay after how much time should the flush happen + */ + void requestDelayedFlush(HRegion region, long delay); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 474bcfb..46b69df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -362,6 +362,7 @@ public class HRegion implements HeapSize { // , Writable{ final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); + private long flushCheckInterval; private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -442,6 +443,8 @@ public class HRegion implements HeapSize { // , Writable{ .add(confParam) .addStringMap(htd.getConfiguration()) .addWritableMap(htd.getValues()); + this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, + DEFAULT_CACHE_FLUSH_INTERVAL); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -948,6 +951,16 @@ public class HRegion implements HeapSize { // , Writable{ private final Object closeLock = new Object(); + // The minimum time difference between the current time and an edit that we want + // to maintain before we consider a store as a candidate for another flush + // If a store is getting a stream of edits then it will eventually flush ... + private static long MIN_TIME_DIFF_LAST_EDIT = 60000; + /** Conf key for the periodic flush interval */ + public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = + "hbase.regionserver.optionalcacheflushinterval"; + /** Default interval for the memstore flush */ + public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -1469,6 +1482,42 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * Should the memstore be flushed now + */ + boolean shouldFlush() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + //if we flushed in the recent past, we don't need to do again now + if ((now - getLastFlushTime() < flushCheckInterval)) { + return false; + } + //since we didn't flush in the recent past, flush now if certain conditions + //are met. Return true on first such memstore hit. + for (Store s : this.getStores().values()) { + if (s.timeOfLastEdit() > getLastFlushTime()) { + if (getRegionInfo().isMetaTable()) { //don't do any more checks for meta. + //return true whenever we have an edit. Time bounds an edit's life in + //the memstore to one flush cycle + return true; + } else { + //return true when, (1) the last edit is older than MIN_TIME_DIFF_LAST_EDIT, + //or, (2) we haven't flushed at all in the last two flush cycles. + //Doing it this way lets us achieve two things: + //(1) it lets the system take care of itself (and avoid unneeded flushes) + //to a certain extent - if the writes are coming actively, then eventually, + //a flush will anyway happen + //(2) guarantees that edits are flushed within two flush cycles (time bounds + //an edit's life in the memstore) + if (now - s.timeOfLastEdit() > MIN_TIME_DIFF_LAST_EDIT || + now - getLastFlushTime() > flushCheckInterval*2) { + return true; + } + } + } + } + return false; + } + + /** * Flush the memstore. * * Flushing the memstore is a little tricky. We have a lot of updates in the @@ -4966,7 +5015,7 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.OBJECT + ClassSize.ARRAY + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (10 * Bytes.SIZEOF_LONG) + + (11 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c8ea3bd..503ae15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -228,7 +228,7 @@ public class HRegionServer implements ClientProtocol, public static final Log LOG = LogFactory.getLog(HRegionServer.class); - private final Random rand = new Random(); + private Random rand = new Random(); /* * Strings to be used in forming the exception message for @@ -351,6 +351,11 @@ public class HRegionServer implements ClientProtocol, */ Chore compactionChecker; + /* + * Check for flushes + */ + Chore periodicFlusher; + // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected volatile HLog hlog; @@ -485,6 +490,7 @@ public class HRegionServer implements ClientProtocol, throw new IllegalArgumentException("Failed resolve of " + initialIsa); } + this.rand = new Random(initialIsa.hashCode()); this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this, new Class[]{ClientProtocol.class, AdminProtocol.class, HBaseRPCErrorHandler.class, @@ -803,6 +809,7 @@ public class HRegionServer implements ClientProtocol, ".multiplier", 1000); this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency * multiplier, this); + this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this); // Health checker thread. int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); @@ -1371,6 +1378,38 @@ public class HRegionServer implements ClientProtocol, } } + class PeriodicMemstoreFlusher extends Chore { + final HRegionServer server; + final static int RANGE_OF_DELAY = 20000; //millisec + final static int MIN_DELAY_TIME = 3000; //millisec + public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { + super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server); + this.server = server; + } + + @Override + protected void chore() { + for (HRegion r : this.server.onlineRegions.values()) { + if (r == null) + continue; + if (r.shouldFlush()) { + FlushRequester requester = server.getFlushRequester(); + if (requester != null) { + long s; + long randomDelay = + (s = rand.nextInt(RANGE_OF_DELAY + 1)) < MIN_DELAY_TIME ? MIN_DELAY_TIME : s; + LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() + + " after a delay of " + randomDelay); + //Throttle the flushes by putting a delay. If we don't throttle, and there + //is a balanced write-load on the regions in a table, we might end up + //overwhelming the filesystem with too many flushes at once. + requester.requestDelayedFlush(r, randomDelay); + } + } + } + } + } + /** * Report the status of the server. A server is online once all the startup is * completed (setting up filesystem, starting service threads, etc.). This @@ -1516,6 +1555,8 @@ public class HRegionServer implements ClientProtocol, this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); + Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n + + ".periodicFlusher", uncaughtExceptionHandler); if (this.healthCheckChore != null) { Threads .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", @@ -1596,7 +1637,8 @@ public class HRegionServer implements ClientProtocol, // Verify that all threads are alive if (!(leases.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive() - && this.compactionChecker.isAlive())) { + && this.compactionChecker.isAlive()) + && this.periodicFlusher.isAlive()) { stop("One or more threads are no longer alive -- stop"); return false; } @@ -1770,6 +1812,7 @@ public class HRegionServer implements ClientProtocol, */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); + Threads.shutdown(this.periodicFlusher.getThread()); this.cacheFlusher.join(); if (this.healthCheckChore != null) { Threads.shutdown(this.healthCheckChore.getThread()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 31a470f..5a14ab5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -504,6 +504,11 @@ public class HStore implements Store, StoreConfiguration { } } + @Override + public long timeOfLastEdit() { + return memstore.timeOfLastEdit(); + } + /** * Adds a value to the memstore * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 64eb206..2bc16ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -88,6 +89,9 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + // Used to track when to flush + volatile long timeLastAddCalled; + TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; @@ -218,6 +222,26 @@ public class MemStore implements HeapSize { } } + long timeOfLastEdit() { + return timeLastAddCalled; + } + + private boolean addToKVSet(KeyValue e) { + boolean b = this.kvset.add(e); + setLastEditTimeToNow(); + return b; + } + + private boolean removeFromKVSet(KeyValue e) { + boolean b = this.kvset.remove(e); + setLastEditTimeToNow(); + return b; + } + + void setLastEditTimeToNow() { + timeLastAddCalled = EnvironmentEdgeManager.currentTimeMillis(); + } + /** * Internal version of add() that doesn't clone KVs with the * allocator, and doesn't take the lock. @@ -225,7 +249,7 @@ public class MemStore implements HeapSize { * Callers should ensure they already have the read lock taken */ private long internalAdd(final KeyValue toAdd) { - long s = heapSizeChange(toAdd, this.kvset.add(toAdd)); + long s = heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); return s; @@ -273,7 +297,7 @@ public class MemStore implements HeapSize { // If the key is in the memstore, delete it. Update this.size. found = this.kvset.get(kv); if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) { - this.kvset.remove(kv); + removeFromKVSet(kv); long s = heapSizeChange(kv, true); this.size.addAndGet(-s); } @@ -292,7 +316,7 @@ public class MemStore implements HeapSize { this.lock.readLock().lock(); try { KeyValue toAdd = maybeCloneWithAllocator(delete); - s += heapSizeChange(toAdd, this.kvset.add(toAdd)); + s += heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); } finally { this.lock.readLock().unlock(); @@ -588,6 +612,7 @@ public class MemStore implements HeapSize { // false means there was a change, so give us the size. addedSize -= heapSizeChange(cur, true); it.remove(); + setLastEditTimeToNow(); } else { versionsVisible++; } @@ -902,7 +927,7 @@ public class MemStore implements HeapSize { } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (11 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 283ef3a..138f8e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -329,6 +329,18 @@ class MemStoreFlusher implements FlushRequester { } } + public void requestDelayedFlush(HRegion r, long delay) { + synchronized (regionsInQueue) { + if (!regionsInQueue.containsKey(r)) { + // This entry has some delay + FlushRegionEntry fqe = new FlushRegionEntry(r); + fqe.requeue(delay); + this.regionsInQueue.put(r, fqe); + this.flushQueue.add(fqe); + } + } + } + public int getFlushQueueSize() { return flushQueue.size(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 32d581f..2228b8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -96,6 +96,11 @@ public interface Store extends HeapSize { public long add(KeyValue kv); /** + * When was the last edit done in the memstore + */ + long timeOfLastEdit(); + + /** * Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the * key & memstoreTS value of the kv parameter. * @param kv diff --git a/hbase-server/src/main/resources/hbase-default.xml b/hbase-server/src/main/resources/hbase-default.xml index ab152be..75bf25c 100644 --- a/hbase-server/src/main/resources/hbase-default.xml +++ b/hbase-server/src/main/resources/hbase-default.xml @@ -343,6 +343,14 @@ + hbase.regionserver.optionalcacheflushinterval + 3600000 + + Amount of time to wait since the last time a region was flushed before + invoking an optional cache flush. Default 1 hour. + + + hbase.hregion.memstore.flush.size 134217728 diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index d285b65..f2b402d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -722,6 +722,12 @@ public class TestWALReplay { throw new RuntimeException("Exception flushing", e); } } + + @Override + public void requestDelayedFlush(HRegion region, long when) { + // TODO Auto-generated method stub + + } } private void addWALEdits (final byte [] tableName, final HRegionInfo hri,