diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 287cd48..5e8c4ef 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.DefaultEnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.util.Progressable; @@ -219,6 +221,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + private EnvironmentEdge environmentEdge; + /** * Name of the region info file that resides just under the region directory. */ @@ -245,6 +249,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.regiondir = null; this.regionInfo = null; this.threadWakeFrequency = 0L; + this.environmentEdge = new DefaultEnvironmentEdge(); } /** @@ -300,6 +305,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong("hbase.hregion.memstore.block.multiplier", 2); + this.environmentEdge = new DefaultEnvironmentEdge(); } /** @@ -362,7 +368,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // HRegion is ready to go! this.writestate.compacting = false; - this.lastFlushTime = System.currentTimeMillis(); + this.lastFlushTime = currentTimeMillis(); LOG.info("region " + this + "/" + this.regionInfo.getEncodedName() + " available; sequence id is " + this.minSequenceId); } @@ -670,7 +676,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // Calculate regionid to use. Can't be less than that of parent else // it'll insert into wrong location over in .META. table: HBASE-710. - long rid = System.currentTimeMillis(); + long rid = currentTimeMillis(); if (rid < this.regionInfo.getRegionId()) { LOG.warn("Clock skew; parent regions id is " + this.regionInfo.getRegionId() + " but current time here is " + rid); @@ -835,7 +841,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } LOG.info("Starting" + (majorCompaction? " major " : " ") + "compaction on region " + this); - long startTime = System.currentTimeMillis(); + long startTime = currentTimeMillis(); doRegionCompactionPrep(); long maxSize = -1; for (Store store: stores.values()) { @@ -846,7 +852,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } doRegionCompactionCleanup(); - String timeTaken = StringUtils.formatTimeDiff(System.currentTimeMillis(), + String timeTaken = StringUtils.formatTimeDiff(currentTimeMillis(), startTime); LOG.info("compaction completed on region " + this + " in " + timeTaken); } finally { @@ -948,7 +954,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * because a Snapshot was not properly persisted. */ protected boolean internalFlushcache() throws IOException { - final long startTime = System.currentTimeMillis(); + final long startTime = currentTimeMillis(); // Clear flush flag. // Record latest flush time this.lastFlushTime = startTime; @@ -1087,7 +1093,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } if (LOG.isDebugEnabled()) { - long now = System.currentTimeMillis(); + long now = currentTimeMillis(); LOG.debug("Finished memstore flush of ~" + StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId + @@ -1267,7 +1273,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ */ public void delete(Map> familyMap, boolean writeToWAL) throws IOException { - long now = System.currentTimeMillis(); + long now = currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); boolean flush = false; @@ -1428,7 +1434,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // If we did not pass an existing row lock, obtain a new one Integer lid = getLock(lockid, row); - byte [] now = Bytes.toBytes(System.currentTimeMillis()); try { // All edits for the given row (across all column families) must happen atomically. put(put.getFamilyMap(), writeToWAL); @@ -1470,8 +1475,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ checkFamily(family); get.addColumn(family, qualifier); - byte [] now = Bytes.toBytes(System.currentTimeMillis()); - // Lock row Integer lid = getLock(lockId, get.getRow()); List result = new ArrayList(); @@ -1618,7 +1621,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ */ private void put(final Map> familyMap, boolean writeToWAL) throws IOException { - long now = System.currentTimeMillis(); + long now = currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); boolean flush = false; this.updatesLock.readLock().lock(); @@ -2666,12 +2669,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // bulid the KeyValue now: KeyValue newKv = new KeyValue(row, family, - qualifier, System.currentTimeMillis(), + qualifier, currentTimeMillis(), Bytes.toBytes(result)); // now log it: if (writeToWAL) { - long now = System.currentTimeMillis(); + long now = currentTimeMillis(); WALEdit walEdit = new WALEdit(); walEdit.add(newKv); this.log.append(regionInfo, regionInfo.getTableDesc().getName(), @@ -2697,6 +2700,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return result; } + private long currentTimeMillis() { + return environmentEdge.currentTimeMillis(); + } // // New HBASE-880 Helpers @@ -2713,7 +2719,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); + (22 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + @@ -2830,6 +2836,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return false; } + public EnvironmentEdge getEnvironmentEdge() { + return environmentEdge; + } + + public void setEnvironmentEdge(EnvironmentEdge environmentEdge) { + this.environmentEdge = environmentEdge; + } + /** * A mocked list implementaion - discards all updates. */ diff --git a/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java b/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java new file mode 100644 index 0000000..1d855e1 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.hbase.util; + +/** + * Default implementation of an environment edge. + */ +public class DefaultEnvironmentEdge implements EnvironmentEdge { + + + /** + * {@inheritDoc} + * + * This implementation returns {@link System#currentTimeMillis()} + */ + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java b/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java new file mode 100644 index 0000000..e4243a4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.hbase.util; + +/** + * Has some basic interaction with the environment. Alternate implementations + * can be used where required (eg in tests). + */ +public interface EnvironmentEdge { + + /** + * Returns the currentTimeMillis. + * + * @return currentTimeMillis. + */ + long currentTimeMillis(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java b/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java new file mode 100644 index 0000000..ae92736 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.hbase.util; + +/** + * Uses an incrementing algorithm instead of the default. + */ +public class IncrementingEnvironmentEdge implements EnvironmentEdge{ + + private long timeIncrement = 1; + + /** + * {@inheritDoc} + * + * This method increments a known value for the current time each time this + * method is called. The first value is 1. + */ + @Override + public long currentTimeMillis() { + return timeIncrement++; + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index d3716d6..470cab0 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -24,12 +24,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DefaultEnvironmentEdge; +import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import java.io.IOException; import java.util.ArrayList; @@ -524,6 +526,7 @@ public class TestHRegion extends HBaseTestCase { byte [][] families = {fam}; String method = this.getName(); initHRegion(tableName, method, families); + region.setEnvironmentEdge(new IncrementingEnvironmentEdge()); byte [] row = Bytes.toBytes("table_name"); // column names @@ -562,9 +565,6 @@ public class TestHRegion extends HBaseTestCase { result = region.get(get, null); assertEquals(1, result.size()); - // Sleep to ensure timestamp of next Put is bigger than previous delete - Thread.sleep(10); - // Assert that after a delete, I can put. put = new Put(row); put.add(fam, splitA, Bytes.toBytes("reference_A")); @@ -578,9 +578,6 @@ public class TestHRegion extends HBaseTestCase { region.delete(delete, null, false); assertEquals(0, region.get(get, null).size()); - // Sleep to ensure timestamp of next Put is bigger than previous delete - Thread.sleep(10); - region.put(new Put(row).add(fam, splitA, Bytes.toBytes("reference_A"))); result = region.get(get, null); assertEquals(1, result.size()); @@ -676,16 +673,14 @@ public class TestHRegion extends HBaseTestCase { public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException { initHRegion(tableName, getName(), fam1); + region.setEnvironmentEdge(new IncrementingEnvironmentEdge()); Put put = new Put(row); put.add(fam1, qual1, value1); region.put(put); - Thread.sleep(10); - // now delete the value: region.delete(delete, null, true); - Thread.sleep(10); // ok put data: put = new Put(row); @@ -2520,4 +2515,34 @@ public class TestHRegion extends HBaseTestCase { Path path = new Path(DIR + callingMethod); region = HRegion.createHRegion(info, path, conf); } + + public void testHRegionConstructor() { + HRegion testRegion = new HRegion(); + assertNotNull("Should not get back null for the environment edge", + testRegion.getEnvironmentEdge()); + assertTrue("Constructor should inject default environment edge but it " + + "was something else", + testRegion.getEnvironmentEdge() instanceof DefaultEnvironmentEdge); + } + + public void testHRegionConstructorMultipleArgs() { + String method = "testHRegionConstructorMultipleArgs"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + + //Setting up region + try { + initHRegion(tableName, method, family); + } catch (IOException e) { + fail(e.getMessage()); + } + assertNotNull("Should not get back null for the environment edge", + region.getEnvironmentEdge()); + assertTrue("Constructor should inject default environment edge but it " + + "was something else", + region.getEnvironmentEdge() instanceof DefaultEnvironmentEdge); + } + + + } diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestDefaultEnvironmentEdge.java b/src/test/java/org/apache/hadoop/hbase/util/TestDefaultEnvironmentEdge.java new file mode 100644 index 0000000..4315d18 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestDefaultEnvironmentEdge.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.hbase.util; + +import org.junit.Test; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + +/** + * Tests to make sure that the default environment edge conforms to appropriate behaviour. + */ +public class TestDefaultEnvironmentEdge { + + @Test + public void testGetCurrentTimeUsesSystemClock() { + DefaultEnvironmentEdge edge = new DefaultEnvironmentEdge(); + long systemTime = System.currentTimeMillis(); + long edgeTime = edge.currentTimeMillis(); + assertTrue("System time must be either the same or less than the edge time", + systemTime < edgeTime || systemTime == edgeTime); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + long secondEdgeTime = edge.currentTimeMillis(); + assertTrue("Second time must be greater than the first", + secondEdgeTime > edgeTime); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestIncrementingEnvironmentEdge.java b/src/test/java/org/apache/hadoop/hbase/util/TestIncrementingEnvironmentEdge.java new file mode 100644 index 0000000..04da4ba --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestIncrementingEnvironmentEdge.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.hbase.util; + +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; + +/** + * Tests that the incrementing environment edge increments time instead of + * using the default. + */ +public class TestIncrementingEnvironmentEdge { + + @Test + public void testGetCurrentTimeUsesSystemClock() { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + assertEquals("First time", 1, edge.currentTimeMillis()); + assertEquals("Second time", 2, edge.currentTimeMillis()); + assertEquals("Third time", 3, edge.currentTimeMillis()); + assertEquals("Fourth time", 4, edge.currentTimeMillis()); + } +}