diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index 4c89768..63690e1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -10,8 +10,11 @@ import java.util.concurrent.atomic.AtomicLong; * the new writes for readers to read (thus forming atomic transactions). */ public class ReadWriteConsistencyControl { - private final AtomicLong memstoreRead = new AtomicLong(); - private final AtomicLong memstoreWrite = new AtomicLong(); + private volatile long memstoreRead = 0; + private volatile long memstoreWrite = 0; + + private final Object readWaiters = new Object(); + // This is the pending queue of writes. private final LinkedList writeQueue = new LinkedList(); @@ -34,12 +37,13 @@ public class ReadWriteConsistencyControl { public WriteEntry beginMemstoreInsert() { synchronized (writeQueue) { - long nextWriteNumber = memstoreWrite.incrementAndGet(); + long nextWriteNumber = ++memstoreWrite; WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } } + public void completeMemstoreInsert(WriteEntry e) { synchronized (writeQueue) { e.markCompleted(); @@ -70,24 +74,32 @@ public class ReadWriteConsistencyControl { } if (nextReadValue > 0) { - memstoreRead.set(nextReadValue); + memstoreRead = nextReadValue; + + synchronized (readWaiters) { + readWaiters.notifyAll(); + } + } } - // Spin until any other concurrent puts have finished. This makes sure that - // if we move on to construct a scanner, we'll get read-your-own-writes - // consistency. We anticipate that since puts to the memstore are very fast, - // this will be on the order of microseconds - so spinning should be faster - // than a condition variable. - int spun = 0; - while (memstoreRead.get() < e.getWriteNumber()) { - spun++; + boolean interrupted = false; + while (memstoreRead < e.getWriteNumber()) { + synchronized (readWaiters) { + try { + readWaiters.wait(0); + } catch (InterruptedException e) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + } + } } - // Could potentially expose spun as a metric + if (interrupted) Thread.currentThread.interrupt(); } public long memstoreReadPoint() { - return memstoreRead.get(); + return memstoreRead; }