diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 959a1a4..f0a19e8 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -346,8 +346,8 @@ hbase.regionserver.optionalcacheflushinterval 3600000 - Amount of time to wait since the last time a region was flushed before - invoking an optional cache flush. Default 1 hour. + Maximum amount of time an edit lives in memory before being automatically flushed. + Default 1 hour. Set it to 0 to disable automatic flushing. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d2cae1e..3f48da0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1336,6 +1336,9 @@ public class HRegion implements HeapSize { // , Writable{ * Should the memstore be flushed now */ boolean shouldFlush() { + if (flushCheckInterval <= 0) { //disabled + return false; + } long now = EnvironmentEdgeManager.currentTimeMillis(); //if we flushed in the recent past, we don't need to do again now if ((now - getLastFlushTime() < flushCheckInterval)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 4c9cfe6..86a30c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -25,6 +25,7 @@ import java.rmi.UnexpectedException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; @@ -877,6 +880,100 @@ public class TestMemStore extends TestCase { //this.memstore = null; } + //////////////////////////////////// + // Test for periodic memstore flushes + // based on time of oldest edit + //////////////////////////////////// + + /** + * Tests that the timeOfOldestEdit is updated correctly for the + * various edit operations in memstore. + * @throws Exception + */ + public void testUpdateToTimeOfOldestEdit() throws Exception { + try { + EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + EnvironmentEdgeManager.injectEdge(edge); + MemStore memstore = new MemStore(); + long t = memstore.timeOfOldestEdit(); + assertEquals(t, Long.MAX_VALUE); + + // test the case that the timeOfOldestEdit is updated after a KV add + memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + t = memstore.timeOfOldestEdit(); + assertTrue(t == 1234); + // snapshot() will reset timeOfOldestEdit. The method will also assert the + // value is reset to Long.MAX_VALUE + t = runSnapshot(memstore); + + // test the case that the timeOfOldestEdit is updated after a KV delete + memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + t = memstore.timeOfOldestEdit(); + assertTrue(t == 1234); + t = runSnapshot(memstore); + + // test the case that the timeOfOldestEdit is updated after a KV upsert + List l = new ArrayList(); + KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); + kv1.setMvccVersion(100); + l.add(kv1); + memstore.upsert(l, 1000); + t = memstore.timeOfOldestEdit(); + assertTrue(t == 1234); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + /** + * Tests the HRegion.shouldFlush method - adds an edit in the memstore + * and checks that shouldFlush returns true, and another where it disables + * the periodic flush functionality and tests whether shouldFlush returns + * false. + * @throws Exception + */ + public void testShouldFlush() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); + checkShouldFlush(conf, true); + // test disable flush + conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0); + checkShouldFlush(conf, false); + } + + private void checkShouldFlush(Configuration conf, boolean expected) throws Exception { + try { + EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + EnvironmentEdgeManager.injectEdge(edge); + HBaseTestingUtility hbaseUtility = new HBaseTestingUtility(conf); + HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo")); + + Map stores = region.getStores(); + assertTrue(stores.size() == 1); + + Store s = stores.entrySet().iterator().next().getValue(); + edge.setCurrentTimeMillis(1234); + s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + edge.setCurrentTimeMillis(1234 + 100); + assertTrue(region.shouldFlush() == false); + edge.setCurrentTimeMillis(1234 + 10000); + assertTrue(region.shouldFlush() == expected); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { + long t = 1234; + @Override + public long currentTimeMillis() { + return t; + } + public void setCurrentTimeMillis(long t) { + this.t = t; + } + } + /** * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. @@ -906,14 +1003,17 @@ public class TestMemStore extends TestCase { return ROW_COUNT; } - private void runSnapshot(final MemStore hmc) throws UnexpectedException { + private long runSnapshot(final MemStore hmc) throws UnexpectedException { // Save off old state. int oldHistorySize = hmc.getSnapshot().size(); hmc.snapshot(); KeyValueSkipListSet ss = hmc.getSnapshot(); // Make some assertions about what just happened. assertTrue("History size has not increased", oldHistorySize < ss.size()); + long t = memstore.timeOfOldestEdit(); + assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); hmc.clearSnapshot(ss); + return t; } private void isExpectedRowWithoutTimestamps(final int rowIndex,