diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 959a1a4..f0a19e8 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -346,8 +346,8 @@
hbase.regionserver.optionalcacheflushinterval
3600000
- Amount of time to wait since the last time a region was flushed before
- invoking an optional cache flush. Default 1 hour.
+ Maximum amount of time an edit lives in memory before being automatically flushed.
+ Default 1 hour. Set it to 0 to disable automatic flushing.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d2cae1e..3f48da0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1336,6 +1336,9 @@ public class HRegion implements HeapSize { // , Writable{
* Should the memstore be flushed now
*/
boolean shouldFlush() {
+ if (flushCheckInterval <= 0) { //disabled
+ return false;
+ }
long now = EnvironmentEdgeManager.currentTimeMillis();
//if we flushed in the recent past, we don't need to do again now
if ((now - getLastFlushTime() < flushCheckInterval)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
index 4c9cfe6..86a30c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
@@ -25,6 +25,7 @@ import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
@@ -877,6 +880,100 @@ public class TestMemStore extends TestCase {
//this.memstore = null;
}
+ ////////////////////////////////////
+ // Test for periodic memstore flushes
+ // based on time of oldest edit
+ ////////////////////////////////////
+
+ /**
+ * Tests that the timeOfOldestEdit is updated correctly for the
+ * various edit operations in memstore.
+ * @throws Exception
+ */
+ public void testUpdateToTimeOfOldestEdit() throws Exception {
+ try {
+ EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+ EnvironmentEdgeManager.injectEdge(edge);
+ MemStore memstore = new MemStore();
+ long t = memstore.timeOfOldestEdit();
+ assertEquals(t, Long.MAX_VALUE);
+
+ // test the case that the timeOfOldestEdit is updated after a KV add
+ memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+ t = memstore.timeOfOldestEdit();
+ assertTrue(t == 1234);
+ // snapshot() will reset timeOfOldestEdit. The method will also assert the
+ // value is reset to Long.MAX_VALUE
+ t = runSnapshot(memstore);
+
+ // test the case that the timeOfOldestEdit is updated after a KV delete
+ memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+ t = memstore.timeOfOldestEdit();
+ assertTrue(t == 1234);
+ t = runSnapshot(memstore);
+
+ // test the case that the timeOfOldestEdit is updated after a KV upsert
+ List l = new ArrayList| ();
+ KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
+ kv1.setMvccVersion(100);
+ l.add(kv1);
+ memstore.upsert(l, 1000);
+ t = memstore.timeOfOldestEdit();
+ assertTrue(t == 1234);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ /**
+ * Tests the HRegion.shouldFlush method - adds an edit in the memstore
+ * and checks that shouldFlush returns true, and another where it disables
+ * the periodic flush functionality and tests whether shouldFlush returns
+ * false.
+ * @throws Exception
+ */
+ public void testShouldFlush() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
+ checkShouldFlush(conf, true);
+ // test disable flush
+ conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
+ checkShouldFlush(conf, false);
+ }
+
+ private void checkShouldFlush(Configuration conf, boolean expected) throws Exception {
+ try {
+ EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+ EnvironmentEdgeManager.injectEdge(edge);
+ HBaseTestingUtility hbaseUtility = new HBaseTestingUtility(conf);
+ HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo"));
+
+ Map stores = region.getStores();
+ assertTrue(stores.size() == 1);
+
+ Store s = stores.entrySet().iterator().next().getValue();
+ edge.setCurrentTimeMillis(1234);
+ s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+ edge.setCurrentTimeMillis(1234 + 100);
+ assertTrue(region.shouldFlush() == false);
+ edge.setCurrentTimeMillis(1234 + 10000);
+ assertTrue(region.shouldFlush() == expected);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
+ long t = 1234;
+ @Override
+ public long currentTimeMillis() {
+ return t;
+ }
+ public void setCurrentTimeMillis(long t) {
+ this.t = t;
+ }
+ }
+
/**
* Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
@@ -906,14 +1003,17 @@ public class TestMemStore extends TestCase {
return ROW_COUNT;
}
- private void runSnapshot(final MemStore hmc) throws UnexpectedException {
+ private long runSnapshot(final MemStore hmc) throws UnexpectedException {
// Save off old state.
int oldHistorySize = hmc.getSnapshot().size();
hmc.snapshot();
KeyValueSkipListSet ss = hmc.getSnapshot();
// Make some assertions about what just happened.
assertTrue("History size has not increased", oldHistorySize < ss.size());
+ long t = memstore.timeOfOldestEdit();
+ assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
hmc.clearSnapshot(ss);
+ return t;
}
private void isExpectedRowWithoutTimestamps(final int rowIndex,
| |