From bc3bba290bacc6687abd191e28fc2bdae0a8353c Mon Sep 17 00:00:00 2001 From: Junegunn Choi Date: Fri, 5 Feb 2016 17:31:23 +0900 Subject: [PATCH] HBASE-15213 Fix increment performance regression caused by HBASE-8763 on branch-1.0 --- .../MultiVersionConsistencyControl.java | 27 +++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index fffd7c0..387d414 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.LinkedList; +import java.util.LinkedHashSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -39,8 +39,8 @@ public class MultiVersionConsistencyControl { private final Object readWaiters = new Object(); // This is the pending queue of writes. - private final LinkedList writeQueue = - new LinkedList(); + private final LinkedHashSet writeQueue = + new LinkedHashSet(); /** * Default constructor. Initializes the memstoreRead/Write points to 0. @@ -100,7 +100,14 @@ public class MultiVersionConsistencyControl { * @return WriteEntry a WriteEntry instance with the passed in curSeqNum */ public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { + return beginMemstoreInsertWithSeqNum(curSeqNum, false); + } + + private WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum, boolean complete) { WriteEntry e = new WriteEntry(curSeqNum); + if (complete) { + e.markCompleted(); + } synchronized (writeQueue) { writeQueue.add(e); return e; @@ -153,11 +160,11 @@ public class MultiVersionConsistencyControl { e.markCompleted(); while (!writeQueue.isEmpty()) { - WriteEntry queueFirst = writeQueue.getFirst(); + WriteEntry queueFirst = writeQueue.iterator().next(); if (queueFirst.isCompleted()) { // Using Max because Edit complete in WAL sync order not arriving order nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); - writeQueue.removeFirst(); + writeQueue.remove(queueFirst); } else { break; } @@ -199,27 +206,31 @@ public class MultiVersionConsistencyControl { * Wait for all previous MVCC transactions complete */ public void waitForPreviousTransactionsComplete() { - WriteEntry w = beginMemstoreInsert(); + WriteEntry w = beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER, true); waitForPreviousTransactionsComplete(w); } public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { boolean interrupted = false; WriteEntry w = waitedEntry; + w.markCompleted(); try { WriteEntry firstEntry = null; do { synchronized (writeQueue) { - // writeQueue won't be empty at this point, the following is just a safety check if (writeQueue.isEmpty()) { break; } - firstEntry = writeQueue.getFirst(); + firstEntry = writeQueue.iterator().next(); if (firstEntry == w) { // all previous in-flight transactions are done break; } + // WriteEntry already was removed from the queue by another handler + if (!writeQueue.contains(w)) { + break; + } try { writeQueue.wait(0); } catch (InterruptedException ie) { -- 2.6.3