diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 0c77304..f579e60 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -278,12 +278,19 @@ public final class HConstants { public static final String HREGION_MEMSTORE_FLUSH_SIZE = "hbase.hregion.memstore.flush.size"; + /** Conf key for the periodic flush interval */ + public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = + "hbase.regionserver.optionalcacheflushinterval"; + public static final String HREGION_EDITS_REPLAY_SKIP_ERRORS = "hbase.hregion.edits.replay.skip.errors"; public static final boolean DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS = false; + /** Default interval for the memstore flush */ + public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + /** Default size of a reservation block */ public static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5e5476d..890635e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -358,6 +358,7 @@ public class HRegion implements HeapSize { // , Writable{ final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); + private long flushCheckInterval; private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -437,6 +438,9 @@ public class HRegion implements HeapSize { // , Writable{ this.conf = new CompoundConfiguration() .add(confParam) .addWritableMap(htd.getValues()); + this.flushCheckInterval = conf.getInt( + HConstants.MEMSTORE_PERIODIC_FLUSH_INTERVAL, + HConstants.DEFAULT_CACHE_FLUSH_INTERVAL); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -1464,6 +1468,26 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * Should the memstore be flushed now + */ + boolean shouldFlush() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + //if we flushed in the recent past, we don't need to do again now + if (!(now - getLastFlushTime() > HConstants.DEFAULT_CACHE_FLUSH_INTERVAL)) { + return false; + } + //if the last flush happened quite sometime back or never, flush now if there are edits + //and no flush happened after the edits. Return true on first such find. + for (Store s : this.getStores().values()) { + if ((now - s.timeOfLastEdit()) > flushCheckInterval && + (now - getLastFlushTime()) > (now - s.timeOfLastEdit())) { + return true; + } + } + return false; + } + + /** * Flush the memstore. * * Flushing the memstore is a little tricky. We have a lot of updates in the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9948682..fe8d776 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1389,6 +1389,32 @@ public class HRegionServer implements ClientProtocol, } } + class PeriodicMemstoreFlusher extends Chore { + HRegionServer server; + public PeriodicMemstoreFlusher(final Configuration conf, + final HRegionServer server) { + super(server.getServerName() + "-MemstoreFlusherChore", + server.getConfiguration().getInt(HConstants.MEMSTORE_PERIODIC_FLUSH_INTERVAL, + HConstants.DEFAULT_CACHE_FLUSH_INTERVAL), server); + this.server = server; + } + + @Override + protected void chore() { + for (HRegion r : this.server.onlineRegions.values()) { + if (r == null) + continue; + if (r.shouldFlush()) { + FlushRequester requester = server.getFlushRequester(); + if (requester != null) { + LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString()); + requester.requestFlush(r); + } + } + } + } + } + /** * Report the status of the server. A server is online once all the startup is * completed (setting up filesystem, starting service threads, etc.). This diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index bace031..1c24128 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -492,6 +492,11 @@ public class HStore implements Store, StoreConfiguration { } } + @Override + public long timeOfLastEdit() { + return memstore.timeOfLastEdit(); + } + /** * Adds a value to the memstore * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 64eb206..5d06294 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -88,6 +89,9 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + // Used to track when to flush + volatile long timeLastAddCalled; + TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; @@ -218,6 +222,26 @@ public class MemStore implements HeapSize { } } + long timeOfLastEdit() { + return timeLastAddCalled; + } + + private boolean addToKVSet(KeyValue e) { + boolean b = this.kvset.add(e); + setLastEditTimeToNow(); + return b; + } + + private boolean removeFromKVSet(KeyValue e) { + boolean b = this.kvset.remove(e); + setLastEditTimeToNow(); + return b; + } + + void setLastEditTimeToNow() { + timeLastAddCalled = EnvironmentEdgeManager.currentTimeMillis(); + } + /** * Internal version of add() that doesn't clone KVs with the * allocator, and doesn't take the lock. @@ -225,7 +249,7 @@ public class MemStore implements HeapSize { * Callers should ensure they already have the read lock taken */ private long internalAdd(final KeyValue toAdd) { - long s = heapSizeChange(toAdd, this.kvset.add(toAdd)); + long s = heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); return s; @@ -273,7 +297,7 @@ public class MemStore implements HeapSize { // If the key is in the memstore, delete it. Update this.size. found = this.kvset.get(kv); if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) { - this.kvset.remove(kv); + removeFromKVSet(kv); long s = heapSizeChange(kv, true); this.size.addAndGet(-s); } @@ -292,7 +316,7 @@ public class MemStore implements HeapSize { this.lock.readLock().lock(); try { KeyValue toAdd = maybeCloneWithAllocator(delete); - s += heapSizeChange(toAdd, this.kvset.add(toAdd)); + s += heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); } finally { this.lock.readLock().unlock(); @@ -588,6 +612,7 @@ public class MemStore implements HeapSize { // false means there was a change, so give us the size. addedSize -= heapSizeChange(cur, true); it.remove(); + setLastEditTimeToNow(); } else { versionsVisible++; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 32d581f..2228b8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -96,6 +96,11 @@ public interface Store extends HeapSize { public long add(KeyValue kv); /** + * When was the last edit done in the memstore + */ + long timeOfLastEdit(); + + /** * Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the * key & memstoreTS value of the kv parameter. * @param kv diff --git a/hbase-server/src/main/resources/hbase-default.xml b/hbase-server/src/main/resources/hbase-default.xml index b62d308..52c990c 100644 --- a/hbase-server/src/main/resources/hbase-default.xml +++ b/hbase-server/src/main/resources/hbase-default.xml @@ -349,6 +349,14 @@ + hbase.regionserver.optionalcacheflushinterval + 3600000 + + Amount of time to wait since the last time a region was flushed before + invoking an optional cache flush. Default 1 hour. + + + hbase.hregion.memstore.flush.size 134217728