Index: security/src/test/resources/hbase-site.xml =================================================================== --- security/src/test/resources/hbase-site.xml (revision 1482161) +++ security/src/test/resources/hbase-site.xml (working copy) @@ -97,14 +97,6 @@ - hbase.regionserver.optionalcacheflushinterval - 1000 - - Amount of time to wait since the last time a region was flushed before - invoking an optional cache flush. Default 60,000. - - - hbase.regionserver.safemode false Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1482161) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -762,6 +762,12 @@ throw new RuntimeException("Exception flushing", e); } } + + @Override + public void requestDelayedFlush(HRegion region, long when) { + // TODO Auto-generated method stub + + } } private void addWALEdits (final byte [] tableName, final HRegionInfo hri, Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1482161) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -39,6 +40,8 @@ import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; @@ -870,6 +873,99 @@ assertEquals(newSize, this.memstore.size.get()); } + //////////////////////////////////// + // Test for periodic memstore flushes + // based on time of oldest edit + //////////////////////////////////// + + /** + * Tests that the timeOfOldestEdit is updated correctly for the + * various edit operations in memstore. + * @throws Exception + */ + public void testUpdateToTimeOfOldestEdit() throws Exception { + try { + EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + EnvironmentEdgeManager.injectEdge(edge); + MemStore memstore = new MemStore(); + long t = memstore.timeOfOldestEdit(); + assertEquals(t, Long.MAX_VALUE); + + // test the case that the timeOfOldestEdit is updated after a KV add + memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + t = memstore.timeOfOldestEdit(); + assertTrue(t == 1234); + // snapshot() will reset timeOfOldestEdit. The method will also assert the + // value is reset to Long.MAX_VALUE + t = runSnapshot(memstore); + + // test the case that the timeOfOldestEdit is updated after a KV delete + memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + t = memstore.timeOfOldestEdit(); + assertTrue(t == 1234); + t = runSnapshot(memstore); + + // test the case that the timeOfOldestEdit is updated after a KV upsert + List l = new ArrayList(); + KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); + l.add(kv1); + memstore.upsert(l); + t = memstore.timeOfOldestEdit(); + assertTrue(t == 1234); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + /** + * Tests the HRegion.shouldFlush method - adds an edit in the memstore + * and checks that shouldFlush returns true, and another where it disables + * the periodic flush functionality and tests whether shouldFlush returns + * false. + * @throws Exception + */ + public void testShouldFlush() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); + checkShouldFlush(conf, true); + // test disable flush + conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0); + checkShouldFlush(conf, false); + } + + private void checkShouldFlush(Configuration conf, boolean expected) throws Exception { + try { + EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + EnvironmentEdgeManager.injectEdge(edge); + HBaseTestingUtility hbaseUtility = new HBaseTestingUtility(conf); + HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo")); + + Map stores = region.getStores(); + assertTrue(stores.size() == 1); + + Store s = stores.entrySet().iterator().next().getValue(); + edge.setCurrentTimeMillis(1234); + s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + edge.setCurrentTimeMillis(1234 + 100); + assertTrue(region.shouldFlush() == false); + edge.setCurrentTimeMillis(1234 + 10000); + assertTrue(region.shouldFlush() == expected); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { + long t = 1234; + @Override + public long currentTimeMillis() { + return t; + } + public void setCurrentTimeMillis(long t) { + this.t = t; + } + } + /** * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. @@ -898,14 +994,17 @@ return ROW_COUNT; } - private void runSnapshot(final MemStore hmc) throws UnexpectedException { + private long runSnapshot(final MemStore hmc) throws UnexpectedException { // Save off old state. int oldHistorySize = hmc.getSnapshot().size(); hmc.snapshot(); KeyValueSkipListSet ss = hmc.getSnapshot(); // Make some assertions about what just happened. assertTrue("History size has not increased", oldHistorySize < ss.size()); + long t = memstore.timeOfOldestEdit(); + assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); hmc.clearSnapshot(ss); + return t; } private void isExpectedRowWithoutTimestamps(final int rowIndex, Index: src/test/resources/hbase-site.xml =================================================================== --- src/test/resources/hbase-site.xml (revision 1482161) +++ src/test/resources/hbase-site.xml (working copy) @@ -97,14 +97,6 @@ - hbase.regionserver.optionalcacheflushinterval - 1000 - - Amount of time to wait since the last time a region was flushed before - invoking an optional cache flush. Default 60,000. - - - hbase.regionserver.safemode false Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1482161) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -322,6 +322,18 @@ } } + public void requestDelayedFlush(HRegion r, long delay) { + synchronized (regionsInQueue) { + if (!regionsInQueue.containsKey(r)) { + // This entry has some delay + FlushRegionEntry fqe = new FlushRegionEntry(r); + fqe.requeue(delay); + this.regionsInQueue.put(r, fqe); + this.flushQueue.add(fqe); + } + } + } + public int getFlushQueueSize() { return flushQueue.size(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1482161) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -345,6 +345,7 @@ final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); + private long flushCheckInterval; private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -452,6 +453,8 @@ else { this.conf = new CompoundConfiguration().add(confParam); } + this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, + DEFAULT_CACHE_FLUSH_INTERVAL); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -953,6 +956,12 @@ private final Object closeLock = new Object(); + /** Conf key for the periodic flush interval */ + public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = + "hbase.regionserver.optionalcacheflushinterval"; + /** Default interval for the memstore flush */ + public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -1475,6 +1484,29 @@ } /** + * Should the memstore be flushed now + */ + boolean shouldFlush() { + if (flushCheckInterval <= 0) { //disabled + return false; + } + long now = EnvironmentEdgeManager.currentTimeMillis(); + //if we flushed in the recent past, we don't need to do again now + if ((now - getLastFlushTime() < flushCheckInterval)) { + return false; + } + //since we didn't flush in the recent past, flush now if certain conditions + //are met. Return true on first such memstore hit. + for (Store s : this.getStores().values()) { + if (s.timeOfOldestEdit() < now - flushCheckInterval) { + // we have an old enough edit in the memstore, flush + return true; + } + } + return false; + } + + /** * Flush the memstore. * * Flushing the memstore is a little tricky. We have a lot of updates in the @@ -5421,7 +5453,7 @@ ClassSize.OBJECT + ClassSize.ARRAY + 36 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (7 * Bytes.SIZEOF_LONG) + + (8 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1482161) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -513,6 +513,13 @@ } /** + * When was the oldest edit done in the memstore + */ + public long timeOfOldestEdit() { + return memstore.timeOfOldestEdit(); + } + + /** * Adds a value to the memstore * * @param kv Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1482161) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -31,7 +31,6 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -210,7 +209,7 @@ private HFileSystem fs; private boolean useHBaseChecksum; // verify hbase checksums? private Path rootDir; - private final Random rand = new Random(); + private final Random rand; //RegionName vs current action in progress //true - if open region action in progress @@ -291,6 +290,11 @@ */ Chore compactionChecker; + /* + * Check for flushes + */ + Chore periodicFlusher; + // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected volatile HLog hlog; @@ -443,6 +447,8 @@ if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initialIsa); } + + this.rand = new Random(initialIsa.hashCode()); this.rpcServer = HBaseRPC.getServer(this, new Class[]{HRegionInterface.class, HBaseRPCErrorHandler.class, OnlineRegions.class}, @@ -697,6 +703,8 @@ this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency * multiplier, this); + this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this); + // Health checker thread. int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); @@ -1348,6 +1356,36 @@ } } + class PeriodicMemstoreFlusher extends Chore { + final HRegionServer server; + final static int RANGE_OF_DELAY = 20000; //millisec + final static int MIN_DELAY_TIME = 3000; //millisec + public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { + super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server); + this.server = server; + } + + @Override + protected void chore() { + for (HRegion r : this.server.onlineRegions.values()) { + if (r == null) + continue; + if (r.shouldFlush()) { + FlushRequester requester = server.getFlushRequester(); + if (requester != null) { + long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME; + LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() + + " after a delay of " + randomDelay); + //Throttle the flushes by putting a delay. If we don't throttle, and there + //is a balanced write-load on the regions in a table, we might end up + //overwhelming the filesystem with too many flushes at once. + requester.requestDelayedFlush(r, randomDelay); + } + } + } + } + } + /** * Report the status of the server. A server is online once all the startup is * completed (setting up filesystem, starting service threads, etc.). This @@ -1659,6 +1697,8 @@ uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); + Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n + + ".periodicFlusher", uncaughtExceptionHandler); if (this.healthCheckChore != null) { Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", uncaughtExceptionHandler); @@ -1739,7 +1779,8 @@ // Verify that all threads are alive if (!(leases.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive() - && this.compactionChecker.isAlive())) { + && this.compactionChecker.isAlive()) + && this.periodicFlusher.isAlive()) { stop("One or more threads are no longer alive -- stop"); return false; } @@ -1916,6 +1957,7 @@ */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); + Threads.shutdown(this.periodicFlusher.getThread()); Threads.shutdown(this.cacheFlusher.getThread()); if (this.healthCheckChore != null) { Threads.shutdown(this.healthCheckChore.getThread()); Index: src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (revision 1482161) +++ src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (working copy) @@ -30,4 +30,12 @@ * @param region the HRegion requesting the cache flush */ void requestFlush(HRegion region); + + /** + * Tell the listener the cache needs to be flushed after a delay + * + * @param region the HRegion requesting the cache flush + * @param delay after how much time should the flush happen + */ + void requestDelayedFlush(HRegion region, long delay); } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1482161) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -87,6 +88,9 @@ // Used to track own heapSize final AtomicLong size; + // Used to track when to flush + volatile long timeOfOldestEdit = Long.MAX_VALUE; + TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; @@ -158,6 +162,7 @@ if (allocator != null) { this.allocator = new MemStoreLAB(conf); } + timeOfOldestEdit = Long.MAX_VALUE; } } } finally { @@ -217,6 +222,28 @@ } } + long timeOfOldestEdit() { + return timeOfOldestEdit; + } + + private boolean addToKVSet(KeyValue e) { + boolean b = this.kvset.add(e); + setOldestEditTimeToNow(); + return b; + } + + private boolean removeFromKVSet(KeyValue e) { + boolean b = this.kvset.remove(e); + setOldestEditTimeToNow(); + return b; + } + + void setOldestEditTimeToNow() { + if (timeOfOldestEdit == Long.MAX_VALUE) { + timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis(); + } + } + /** * Internal version of add() that doesn't clone KVs with the * allocator, and doesn't take the lock. @@ -224,7 +251,7 @@ * Callers should ensure they already have the read lock taken */ private long internalAdd(final KeyValue toAdd) { - long s = heapSizeChange(toAdd, this.kvset.add(toAdd)); + long s = heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); return s; @@ -272,7 +299,7 @@ // If the key is in the memstore, delete it. Update this.size. found = this.kvset.get(kv); if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) { - this.kvset.remove(kv); + removeFromKVSet(kv); long s = heapSizeChange(kv, true); this.size.addAndGet(-s); } @@ -291,7 +318,7 @@ this.lock.readLock().lock(); try { KeyValue toAdd = maybeCloneWithAllocator(delete); - s += heapSizeChange(toAdd, this.kvset.add(toAdd)); + s += heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); } finally { this.lock.readLock().unlock(); @@ -588,6 +615,7 @@ addedSize -= delta; this.size.addAndGet(-delta); it.remove(); + setOldestEditTimeToNow(); } } else { // past the column, done @@ -899,7 +927,7 @@ } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (11 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + Index: src/main/resources/hbase-default.xml =================================================================== --- src/main/resources/hbase-default.xml (revision 1482161) +++ src/main/resources/hbase-default.xml (working copy) @@ -353,6 +353,14 @@ + hbase.regionserver.optionalcacheflushinterval + 3600000 + + Maximum amount of time an edit lives in memory before being automatically flushed. + Default 1 hour. Set it to 0 to disable automatic flushing. + + + hbase.hregion.memstore.flush.size 134217728