diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a8ffa8d..2781f4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7138,7 +7138,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // now start my own transaction writeEntry = walKey.getWriteEntry(); - // Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java index b63ca9e..556126f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java @@ -19,8 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Threads; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -30,6 +34,8 @@ import org.junit.experimental.categories.Category; */ @Category({RegionServerTests.class, SmallTests.class}) public class TestMultiVersionConcurrencyControlBasic { + private static final Log LOG = LogFactory.getLog(TestMultiVersionConcurrencyControlBasic.class); + @Test public void testSimpleMvccOps() { MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); @@ -42,4 +48,47 @@ public class TestMultiVersionConcurrencyControlBasic { mvcc.complete(writeEntry); assertEquals(readPoint + 2, mvcc.getWritePoint()); } + + /** + * Cycle some mvcc ops. + */ + private static class CycleSomeMvccOps extends Thread { + private final int cycles; + private final MultiVersionConcurrencyControl mvcc; + private final long pause; + + CycleSomeMvccOps(String name, final MultiVersionConcurrencyControl mvcc, final int cycles, + final int pause) { + super(name); + this.cycles = cycles; + this.mvcc = mvcc; + this.pause = pause; + } + + public void run() { + for (int i = 0; i < cycles; i++) { + WriteEntry we = mvcc.begin(); + if (pause > 0) Threads.sleep(pause); + mvcc.completeAndWait(we); + } + } + } + + @Test + public void testSlowWriteHoldsUpOtherWrites() throws InterruptedException { + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + final int threadCount = 10; + final int cycles = 10000; + Thread [] threads = new Thread[threadCount]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new CycleSomeMvccOps("" + i, mvcc, cycles, -1); + } + LOG.info("Start"); + Thread slowPoke = new CycleSomeMvccOps("slowpoke", mvcc, cycles, 1); + slowPoke.start(); + for (int i = 0; i < threads.length; i++) threads[i].start(); + for (int i = 0; i < threads.length; i++) threads[i].join(); + slowPoke.join(); + LOG.info("Finished"); + } } \ No newline at end of file