diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index 4c89768..8d47aeb 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -10,8 +10,11 @@ import java.util.concurrent.atomic.AtomicLong; * the new writes for readers to read (thus forming atomic transactions). */ public class ReadWriteConsistencyControl { - private final AtomicLong memstoreRead = new AtomicLong(); - private final AtomicLong memstoreWrite = new AtomicLong(); + private volatile long memstoreRead = 0; + private volatile long memstoreWrite = 0; + + private final Object readWaiters = new Object(); + // This is the pending queue of writes. private final LinkedList writeQueue = new LinkedList(); @@ -34,7 +37,7 @@ public class ReadWriteConsistencyControl { public WriteEntry beginMemstoreInsert() { synchronized (writeQueue) { - long nextWriteNumber = memstoreWrite.incrementAndGet(); + long nextWriteNumber = ++memstoreWrite; WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; @@ -70,7 +73,12 @@ public class ReadWriteConsistencyControl { } if (nextReadValue > 0) { - memstoreRead.set(nextReadValue); + memstoreRead = nextReadValue; + + synchronized (readWaiters) { + readWaiters.notifyAll(); + } + } } @@ -80,14 +88,19 @@ public class ReadWriteConsistencyControl { // this will be on the order of microseconds - so spinning should be faster // than a condition variable. int spun = 0; - while (memstoreRead.get() < e.getWriteNumber()) { + while (memstoreRead < e.getWriteNumber()) { + synchronized (readWaiters) { + try { + readWaiters.wait(0); + } catch (InterruptedException ignored) {} + } spun++; } // Could potentially expose spun as a metric } public long memstoreReadPoint() { - return memstoreRead.get(); + return memstoreRead; }