Index: security/src/test/resources/hbase-site.xml
===================================================================
--- security/src/test/resources/hbase-site.xml (revision 1482161)
+++ security/src/test/resources/hbase-site.xml (working copy)
@@ -97,14 +97,6 @@
- hbase.regionserver.optionalcacheflushinterval
- 1000
-
- Amount of time to wait since the last time a region was flushed before
- invoking an optional cache flush. Default 60,000.
-
-
-
hbase.regionserver.safemode
false
Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1482161)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy)
@@ -762,6 +762,12 @@
throw new RuntimeException("Exception flushing", e);
}
}
+
+ @Override
+ public void requestDelayedFlush(HRegion region, long when) {
+ // TODO Auto-generated method stub
+
+ }
}
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1482161)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy)
@@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -39,6 +40,8 @@
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
@@ -870,6 +873,99 @@
assertEquals(newSize, this.memstore.size.get());
}
+ ////////////////////////////////////
+ // Test for periodic memstore flushes
+ // based on time of oldest edit
+ ////////////////////////////////////
+
+ /**
+ * Tests that the timeOfOldestEdit is updated correctly for the
+ * various edit operations in memstore.
+ * @throws Exception
+ */
+ public void testUpdateToTimeOfOldestEdit() throws Exception {
+ try {
+ EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+ EnvironmentEdgeManager.injectEdge(edge);
+ MemStore memstore = new MemStore();
+ long t = memstore.timeOfOldestEdit();
+ assertEquals(t, Long.MAX_VALUE);
+
+ // test the case that the timeOfOldestEdit is updated after a KV add
+ memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+ t = memstore.timeOfOldestEdit();
+ assertTrue(t == 1234);
+ // snapshot() will reset timeOfOldestEdit. The method will also assert the
+ // value is reset to Long.MAX_VALUE
+ t = runSnapshot(memstore);
+
+ // test the case that the timeOfOldestEdit is updated after a KV delete
+ memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+ t = memstore.timeOfOldestEdit();
+ assertTrue(t == 1234);
+ t = runSnapshot(memstore);
+
+ // test the case that the timeOfOldestEdit is updated after a KV upsert
+ List l = new ArrayList();
+ KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
+ l.add(kv1);
+ memstore.upsert(l);
+ t = memstore.timeOfOldestEdit();
+ assertTrue(t == 1234);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ /**
+ * Tests the HRegion.shouldFlush method - adds an edit in the memstore
+ * and checks that shouldFlush returns true, and another where it disables
+ * the periodic flush functionality and tests whether shouldFlush returns
+ * false.
+ * @throws Exception
+ */
+ public void testShouldFlush() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
+ checkShouldFlush(conf, true);
+ // test disable flush
+ conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
+ checkShouldFlush(conf, false);
+ }
+
+ private void checkShouldFlush(Configuration conf, boolean expected) throws Exception {
+ try {
+ EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+ EnvironmentEdgeManager.injectEdge(edge);
+ HBaseTestingUtility hbaseUtility = new HBaseTestingUtility(conf);
+ HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo"));
+
+ Map stores = region.getStores();
+ assertTrue(stores.size() == 1);
+
+ Store s = stores.entrySet().iterator().next().getValue();
+ edge.setCurrentTimeMillis(1234);
+ s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+ edge.setCurrentTimeMillis(1234 + 100);
+ assertTrue(region.shouldFlush() == false);
+ edge.setCurrentTimeMillis(1234 + 10000);
+ assertTrue(region.shouldFlush() == expected);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
+ long t = 1234;
+ @Override
+ public long currentTimeMillis() {
+ return t;
+ }
+ public void setCurrentTimeMillis(long t) {
+ this.t = t;
+ }
+ }
+
/** * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
@@ -898,14 +994,17 @@
return ROW_COUNT;
}
- private void runSnapshot(final MemStore hmc) throws UnexpectedException {
+ private long runSnapshot(final MemStore hmc) throws UnexpectedException {
// Save off old state.
int oldHistorySize = hmc.getSnapshot().size();
hmc.snapshot();
KeyValueSkipListSet ss = hmc.getSnapshot();
// Make some assertions about what just happened.
assertTrue("History size has not increased", oldHistorySize < ss.size());
+ long t = memstore.timeOfOldestEdit();
+ assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
hmc.clearSnapshot(ss);
+ return t;
}
private void isExpectedRowWithoutTimestamps(final int rowIndex,
Index: src/test/resources/hbase-site.xml
===================================================================
--- src/test/resources/hbase-site.xml (revision 1482161)
+++ src/test/resources/hbase-site.xml (working copy)
@@ -97,14 +97,6 @@
- hbase.regionserver.optionalcacheflushinterval
- 1000
-
- Amount of time to wait since the last time a region was flushed before
- invoking an optional cache flush. Default 60,000.
-
-
-
hbase.regionserver.safemode
false
Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1482161)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy)
@@ -322,6 +322,18 @@
}
}
+ public void requestDelayedFlush(HRegion r, long delay) {
+ synchronized (regionsInQueue) {
+ if (!regionsInQueue.containsKey(r)) {
+ // This entry has some delay
+ FlushRegionEntry fqe = new FlushRegionEntry(r);
+ fqe.requeue(delay);
+ this.regionsInQueue.put(r, fqe);
+ this.flushQueue.add(fqe);
+ }
+ }
+ }
+
public int getFlushQueueSize() {
return flushQueue.size();
}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1482161)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -345,6 +345,7 @@
final RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
private List> recentFlushes = new ArrayList>();
+ private long flushCheckInterval;
private long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard closes
@@ -452,6 +453,8 @@
else {
this.conf = new CompoundConfiguration().add(confParam);
}
+ this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+ DEFAULT_CACHE_FLUSH_INTERVAL);
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
@@ -953,6 +956,12 @@
private final Object closeLock = new Object();
+ /** Conf key for the periodic flush interval */
+ public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
+ "hbase.regionserver.optionalcacheflushinterval";
+ /** Default interval for the memstore flush */
+ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
+
/**
* Close down this HRegion. Flush the cache unless abort parameter is true,
* Shut down each HStore, don't service any more calls.
@@ -1475,6 +1484,29 @@
}
/**
+ * Should the memstore be flushed now
+ */
+ boolean shouldFlush() {
+ if (flushCheckInterval <= 0) { //disabled
+ return false;
+ }
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ //if we flushed in the recent past, we don't need to do again now
+ if ((now - getLastFlushTime() < flushCheckInterval)) {
+ return false;
+ }
+ //since we didn't flush in the recent past, flush now if certain conditions
+ //are met. Return true on first such memstore hit.
+ for (Store s : this.getStores().values()) {
+ if (s.timeOfOldestEdit() < now - flushCheckInterval) {
+ // we have an old enough edit in the memstore, flush
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Flush the memstore.
*
* Flushing the memstore is a little tricky. We have a lot of updates in the
@@ -5421,7 +5453,7 @@
ClassSize.OBJECT +
ClassSize.ARRAY +
36 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (7 * Bytes.SIZEOF_LONG) +
+ (8 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1482161)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy)
@@ -513,6 +513,13 @@
}
/**
+ * When was the oldest edit done in the memstore
+ */
+ public long timeOfOldestEdit() {
+ return memstore.timeOfOldestEdit();
+ }
+
+ /**
* Adds a value to the memstore
*
* @param kv
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1482161)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -31,7 +31,6 @@
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -210,7 +209,7 @@
private HFileSystem fs;
private boolean useHBaseChecksum; // verify hbase checksums?
private Path rootDir;
- private final Random rand = new Random();
+ private final Random rand;
//RegionName vs current action in progress
//true - if open region action in progress
@@ -291,6 +290,11 @@
*/
Chore compactionChecker;
+ /*
+ * Check for flushes
+ */
+ Chore periodicFlusher;
+
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected volatile HLog hlog;
@@ -443,6 +447,8 @@
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
+
+ this.rand = new Random(initialIsa.hashCode());
this.rpcServer = HBaseRPC.getServer(this,
new Class>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
OnlineRegions.class},
@@ -697,6 +703,8 @@
this.compactionChecker = new CompactionChecker(this,
this.threadWakeFrequency * multiplier, this);
+ this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
+
// Health checker thread.
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
@@ -1348,6 +1356,36 @@
}
}
+ class PeriodicMemstoreFlusher extends Chore {
+ final HRegionServer server;
+ final static int RANGE_OF_DELAY = 20000; //millisec
+ final static int MIN_DELAY_TIME = 3000; //millisec
+ public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
+ super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
+ this.server = server;
+ }
+
+ @Override
+ protected void chore() {
+ for (HRegion r : this.server.onlineRegions.values()) {
+ if (r == null)
+ continue;
+ if (r.shouldFlush()) {
+ FlushRequester requester = server.getFlushRequester();
+ if (requester != null) {
+ long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
+ LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
+ " after a delay of " + randomDelay);
+ //Throttle the flushes by putting a delay. If we don't throttle, and there
+ //is a balanced write-load on the regions in a table, we might end up
+ //overwhelming the filesystem with too many flushes at once.
+ requester.requestDelayedFlush(r, randomDelay);
+ }
+ }
+ }
+ }
+ }
+
/**
* Report the status of the server. A server is online once all the startup is
* completed (setting up filesystem, starting service threads, etc.). This
@@ -1659,6 +1697,8 @@
uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
".compactionChecker", uncaughtExceptionHandler);
+ Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
+ ".periodicFlusher", uncaughtExceptionHandler);
if (this.healthCheckChore != null) {
Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
uncaughtExceptionHandler);
@@ -1739,7 +1779,8 @@
// Verify that all threads are alive
if (!(leases.isAlive()
&& cacheFlusher.isAlive() && hlogRoller.isAlive()
- && this.compactionChecker.isAlive())) {
+ && this.compactionChecker.isAlive())
+ && this.periodicFlusher.isAlive()) {
stop("One or more threads are no longer alive -- stop");
return false;
}
@@ -1916,6 +1957,7 @@
*/
protected void join() {
Threads.shutdown(this.compactionChecker.getThread());
+ Threads.shutdown(this.periodicFlusher.getThread());
Threads.shutdown(this.cacheFlusher.getThread());
if (this.healthCheckChore != null) {
Threads.shutdown(this.healthCheckChore.getThread());
Index: src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (revision 1482161)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (working copy)
@@ -30,4 +30,12 @@
* @param region the HRegion requesting the cache flush
*/
void requestFlush(HRegion region);
+
+ /**
+ * Tell the listener the cache needs to be flushed after a delay
+ *
+ * @param region the HRegion requesting the cache flush
+ * @param delay after how much time should the flush happen
+ */
+ void requestDelayedFlush(HRegion region, long delay);
}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1482161)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy)
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* The MemStore holds in-memory modifications to the Store. Modifications
@@ -87,6 +88,9 @@
// Used to track own heapSize
final AtomicLong size;
+ // Used to track when to flush
+ volatile long timeOfOldestEdit = Long.MAX_VALUE;
+
TimeRangeTracker timeRangeTracker;
TimeRangeTracker snapshotTimeRangeTracker;
@@ -158,6 +162,7 @@
if (allocator != null) {
this.allocator = new MemStoreLAB(conf);
}
+ timeOfOldestEdit = Long.MAX_VALUE;
}
}
} finally {
@@ -217,6 +222,28 @@
}
}
+ long timeOfOldestEdit() {
+ return timeOfOldestEdit;
+ }
+
+ private boolean addToKVSet(KeyValue e) {
+ boolean b = this.kvset.add(e);
+ setOldestEditTimeToNow();
+ return b;
+ }
+
+ private boolean removeFromKVSet(KeyValue e) {
+ boolean b = this.kvset.remove(e);
+ setOldestEditTimeToNow();
+ return b;
+ }
+
+ void setOldestEditTimeToNow() {
+ if (timeOfOldestEdit == Long.MAX_VALUE) {
+ timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ }
+
/**
* Internal version of add() that doesn't clone KVs with the
* allocator, and doesn't take the lock.
@@ -224,7 +251,7 @@
* Callers should ensure they already have the read lock taken
*/
private long internalAdd(final KeyValue toAdd) {
- long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
+ long s = heapSizeChange(toAdd, addToKVSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
@@ -272,7 +299,7 @@
// If the key is in the memstore, delete it. Update this.size.
found = this.kvset.get(kv);
if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
- this.kvset.remove(kv);
+ removeFromKVSet(kv);
long s = heapSizeChange(kv, true);
this.size.addAndGet(-s);
}
@@ -291,7 +318,7 @@
this.lock.readLock().lock();
try {
KeyValue toAdd = maybeCloneWithAllocator(delete);
- s += heapSizeChange(toAdd, this.kvset.add(toAdd));
+ s += heapSizeChange(toAdd, addToKVSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
} finally {
this.lock.readLock().unlock();
@@ -588,6 +615,7 @@
addedSize -= delta;
this.size.addAndGet(-delta);
it.remove();
+ setOldestEditTimeToNow();
}
} else {
// past the column, done
@@ -899,7 +927,7 @@
}
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (11 * ClassSize.REFERENCE));
+ ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
Index: src/main/resources/hbase-default.xml
===================================================================
--- src/main/resources/hbase-default.xml (revision 1482161)
+++ src/main/resources/hbase-default.xml (working copy)
@@ -353,6 +353,14 @@
+ hbase.regionserver.optionalcacheflushinterval
+ 3600000
+
+ Maximum amount of time an edit lives in memory before being automatically flushed.
+ Default 1 hour. Set it to 0 to disable automatic flushing.
+
+
+
hbase.hregion.memstore.flush.size
134217728