diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index 93653e6..7e7e079 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -24,7 +24,7 @@ public class ReadWriteConsistencyControl { return e; } } - public long completeMemstoreInsert(WriteEntry e) { + public void completeMemstoreInsert(WriteEntry e) { synchronized (writeQueue) { e.markCompleted(); @@ -56,8 +56,18 @@ public class ReadWriteConsistencyControl { if (nextReadValue > 0) { memstoreRead.set(nextReadValue); } - return nextReadValue; } + + // Spin until any other concurrent puts have finished. This makes sure that + // if we move on to construct a scanner, we'll get read-your-own-writes + // consistency. We anticipate that since puts to the memstore are very fast, + // this will be on the order of microseconds - so spinning should be faster + // than a condition variable. + int spun = 0; + while (memstoreRead.get() < e.getWriteNumber()) { + spun++; + } + // Could potentially expose spun as a metric } public long memstoreReadPoint() { diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java index e457bed..ab3683e 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -239,6 +240,88 @@ public class TestMemStore extends TestCase { assertScannerResults(s[0], new KeyValue[]{kv1, kv2}); } + private static class ReadOwnWritesTester extends Thread { + final int id; + static final int NUM_TRIES = 1000; + + final byte[] row; + + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + + final ReadWriteConsistencyControl rwcc; + final MemStore memstore; + + AtomicReference caughtException; + + + public ReadOwnWritesTester(int id, + MemStore memstore, + ReadWriteConsistencyControl rwcc, + AtomicReference caughtException) + { + this.id = id; + this.rwcc = rwcc; + this.memstore = memstore; + this.caughtException = caughtException; + row = Bytes.toBytes(id); + } + + public void run() { + try { + internalRun(); + } catch (Throwable t) { + caughtException.compareAndSet(null, t); + } + } + + private void internalRun() { + for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + // Insert the sequence value (i) + byte[] v = Bytes.toBytes(i); + + KeyValue kv = new KeyValue(row, f, q1, i, v); + kv.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv); + rwcc.completeMemstoreInsert(w); + + // Assert that we can read back + + KeyValueScanner s = this.memstore.getScanners()[0]; + s.seek(kv); + + KeyValue ret = s.next(); + assertNotNull("Didnt find own write at all", ret); + assertEquals("Didnt read own writes", + kv.getTimestamp(), ret.getTimestamp()); + } + } + } + + public void testReadOwnWritesUnderConcurrency() throws Throwable { + + int NUM_THREADS = 8; + + ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS]; + AtomicReference caught = new AtomicReference(); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i].start(); + } + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(); + } + + if (caught.get() != null) { + throw caught.get(); + } + } + /** * Test memstore snapshots * @throws IOException diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java b/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java index 7b5166f..78fe59c 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java @@ -33,9 +33,8 @@ public class TestReadWriteConsistencyControl extends TestCase { Thread.sleep(0, sleepTime * 1000); } catch (InterruptedException e1) { } - long next = 0; try { - next = rwcc.completeMemstoreInsert(e); + rwcc.completeMemstoreInsert(e); } catch (RuntimeException ex) { // got failure System.out.println(ex.toString()); @@ -44,7 +43,6 @@ public class TestReadWriteConsistencyControl extends TestCase { return; // Report failure if possible. } -// System.out.println("finished write: " + next); } } }