From 7510e34ce8d1e6b278fe8a9c0488ba27b78fb8a9 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 29 Dec 2015 15:04:33 -0800 Subject: [PATCH] HBASE-15047 Try spin lock for MVCC completion Summary: No more locking in mvcc. Only CAS operations. This should keep us in user space a lot more. It should allow much faster responses when logs sync completes. Test Plan: Perf testing to follow Differential Revision: https://reviews.facebook.net/D52407 --- .../MultiVersionConcurrencyControl.java | 142 ++++++++------------- 1 file changed, 51 insertions(+), 91 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index eba99e0..f736a9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -42,20 +44,14 @@ public class MultiVersionConcurrencyControl { final AtomicLong readPoint = new AtomicLong(0); final AtomicLong writePoint = new AtomicLong(0); - private final Object readWaiters = new Object(); + + final ArrayBlockingQueue uncleanQueue = new ArrayBlockingQueue(200); + /** * Represents no value, or not set. */ public static final long NONE = -1; - // This is the pending queue of writes. - // - // TODO(eclark): Should this be an array of fixed size to - // reduce the number of allocations on the write path? - // This could be equal to the number of handlers + a small number. - // TODO: St.Ack 20150903 Sounds good to me. - private final LinkedList writeQueue = new LinkedList(); - public MultiVersionConcurrencyControl() { super(); } @@ -88,24 +84,23 @@ public class MultiVersionConcurrencyControl { * readPoint */ boolean tryAdvanceTo(long newStartPoint, long expected) { - synchronized (writeQueue) { - long currentRead = this.readPoint.get(); - long currentWrite = this.writePoint.get(); - if (currentRead != currentWrite) { - throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead + - ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); - } - if (expected != NONE && expected != currentRead) { - return false; - } - - if (newStartPoint < currentRead) { - return false; - } + long currentRead = this.readPoint.get(); + long currentWrite = this.writePoint.get(); + if (currentRead != currentWrite) { + throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead + + ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); + } + if (expected != NONE && expected != currentRead) { + return false; + } - readPoint.set(newStartPoint); - writePoint.set(newStartPoint); + if (newStartPoint < currentRead) { + return false; } + + readPoint.set(newStartPoint); + writePoint.set(newStartPoint); + return true; } @@ -120,12 +115,9 @@ public class MultiVersionConcurrencyControl { * @see #completeAndWait(WriteEntry) */ public WriteEntry begin() { - synchronized (writeQueue) { - long nextWriteNumber = writePoint.incrementAndGet(); - WriteEntry e = new WriteEntry(nextWriteNumber); - writeQueue.add(e); - return e; - } + long nextWriteNumber = writePoint.incrementAndGet(); + WriteEntry e = new WriteEntry(nextWriteNumber); + return e; } /** @@ -145,8 +137,12 @@ public class MultiVersionConcurrencyControl { * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. */ public void completeAndWait(WriteEntry e) { - complete(e); - waitForRead(e); + e.markCompleted(); + while (!tryComplete(e)) { + // Spin. This will keep the thread scheduled and will keep + // the any un-needed context switching. + cleanUp(); + } } /** @@ -164,67 +160,31 @@ public class MultiVersionConcurrencyControl { * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ public boolean complete(WriteEntry writeEntry) { - synchronized (writeQueue) { - writeEntry.markCompleted(); - - long nextReadValue = NONE; - boolean ranOnce = false; - while (!writeQueue.isEmpty()) { - ranOnce = true; - WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue + 1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("Invariant in complete violated, nextReadValue=" - + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); - } - } - - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); - writeQueue.removeFirst(); - } else { - break; - } - } - - if (!ranOnce) { - throw new RuntimeException("There is no first!"); - } - - if (nextReadValue > 0) { - synchronized (readWaiters) { - readPoint.set(nextReadValue); - readWaiters.notifyAll(); - } - } - return readPoint.get() >= writeEntry.getWriteNumber(); + writeEntry.markCompleted(); + boolean completed = tryComplete(writeEntry); + if (!completed) { + uncleanQueue.add(writeEntry); } + return completed; } - /** - * Wait for the global readPoint to advance up to the passed in write entry number. - */ - void waitForRead(WriteEntry e) { - boolean interrupted = false; - int count = 0; - synchronized (readWaiters) { - while (readPoint.get() < e.getWriteNumber()) { - if (count % 100 == 0 && count > 0) { - LOG.warn("STUCK: " + this); - } - count++; - try { - readWaiters.wait(10); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; - } - } + private boolean tryComplete(WriteEntry writeEntry) { + long nextReadValue = writeEntry.getWriteNumber(); + return readPoint.compareAndSet(nextReadValue - 1, nextReadValue); + } + + private void cleanUp() { + if (uncleanQueue.isEmpty()) { + return; } - if (interrupted) { - Thread.currentThread().interrupt(); + WriteEntry e = null; + long tryCount = 0; + long maxTries = Math.min(20, uncleanQueue.size()); + while (tryCount < maxTries && (e = uncleanQueue.poll()) != null ) { + if (!tryComplete(e) ) { + uncleanQueue.add(e); + } + tryCount ++; } } -- 2.6.4