diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 8378403..ded0ecb 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -33,7 +33,10 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -284,15 +287,27 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // used to achieve atomic operations in the memstore. @Override public long getMvccVersion() { + if(mvcc == null) return 0; + return mvcc.longValue(); + } + + public MutableLong getMvccVersionReference() { return mvcc; } public void setMvccVersion(long mvccVersion){ + if(this.mvcc == null){ + this.mvcc = new MutableLong(); + } + this.mvcc.setValue(mvccVersion); + } + + public void setMvccVersion(MutableLong mvccVersion){ this.mvcc = mvccVersion; } // multi-version concurrency control version. default value is 0, aka do not care. - private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles) + private MutableLong mvcc; // this value is not part of a serialized KeyValue (not in HFiles) /** Dragon time over, return to normal business */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 636e29d..177282f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -2158,6 +2159,7 @@ public class HRegion implements HeapSize { // , Writable{ int lastIndexExclusive = firstIndex; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; + MutableLong seqNumber = new MutableLong(); try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2282,7 +2284,7 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertUseSeqNum(Long.MAX_VALUE); // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -2343,7 +2345,7 @@ public class HRegion implements HeapSize { // , Writable{ assert isInReplay; txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(), walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true, - currentNonceGroup, currentNonce); + currentNonceGroup, currentNonce, seqNumber); hasWalAppends = true; } currentNonceGroup = nonceGroup; @@ -2367,8 +2369,11 @@ public class HRegion implements HeapSize { // , Writable{ if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId, - true, currentNonceGroup, currentNonce); + true, currentNonceGroup, currentNonce, seqNumber); hasWalAppends = true; + } else { + // still need to assign seqence number for SKIP WAL changes + seqNumber.setValue(this.sequenceId.incrementAndGet()); } // ------------------------------- @@ -2399,7 +2404,8 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - mvcc.completeMemstoreInsert(w); + w.setWriteNumber(seqNumber.longValue()); + mvcc.completeMemstoreInsertUseSeqNum(w); w = null; } @@ -2431,7 +2437,7 @@ public class HRegion implements HeapSize { // , Writable{ if (doRollBackMemstore) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) mvcc.completeMemstoreInsert(w); + if (w != null) mvcc.completeMemstoreInsertUseSeqNum(w); if (locked) { this.updatesLock.readLock().unlock(); @@ -2517,7 +2523,9 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + WriteEntry tmpEntry = mvcc.beginMemstoreInsertUseSeqNum(this.sequenceId.incrementAndGet()); + mvcc.waitforPreviouseTranstionsComplete(tmpEntry); + mvcc.completeMemstoreInsertUseSeqNum(tmpEntry); List result; try { result = get(get, false); @@ -4687,6 +4695,7 @@ public class HRegion implements HeapSize { // , Writable{ long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); + MutableLong seqNumber = new MutableLong(); try { // 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); @@ -4707,7 +4716,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!mutations.isEmpty()) { // 5. Get a mvcc write number - writeEntry = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsertUseSeqNum(Long.MAX_VALUE); // 6. Apply to memstore for (KeyValue kv : mutations) { kv.setMvccVersion(writeEntry.getWriteNumber()); @@ -4721,7 +4730,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now, - this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); + this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce, seqNumber); } // 8. Release region lock if (locked) { @@ -4749,7 +4758,8 @@ public class HRegion implements HeapSize { // , Writable{ } // 11. Roll mvcc forward if (writeEntry != null) { - mvcc.completeMemstoreInsert(writeEntry); + writeEntry.setWriteNumber(seqNumber.longValue()); + mvcc.completeMemstoreInsertUseSeqNum(writeEntry); writeEntry = null; } if (locked) { @@ -4856,6 +4866,7 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); + MutableLong seqNumber = new MutableLong(); WriteEntry w = null; RowLock rowLock; try { @@ -4864,9 +4875,11 @@ public class HRegion implements HeapSize { // , Writable{ lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + w = mvcc.beginMemstoreInsertUseSeqNum(this.sequenceId.incrementAndGet()); + mvcc.waitforPreviouseTranstionsComplete(w); + mvcc.completeMemstoreInsertUseSeqNum(w); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertUseSeqNum(Long.MAX_VALUE); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4969,7 +4982,7 @@ public class HRegion implements HeapSize { // , Writable{ txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, - true, nonceGroup, nonce); + true, nonceGroup, nonce, seqNumber); } else { recordMutationWithoutWal(append.getFamilyCellMap()); } @@ -5003,7 +5016,8 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { if (w != null) { - mvcc.completeMemstoreInsert(w); + w.setWriteNumber(seqNumber.longValue()); + mvcc.completeMemstoreInsertUseSeqNum(w); } closeRegionOperation(); } @@ -5052,15 +5066,18 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); WriteEntry w = null; + MutableLong seqNumber = new MutableLong(); try { RowLock rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + w = mvcc.beginMemstoreInsertUseSeqNum(this.sequenceId.incrementAndGet()); + mvcc.waitforPreviouseTranstionsComplete(w); + mvcc.completeMemstoreInsertUseSeqNum(w); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertUseSeqNum(Long.MAX_VALUE); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5148,7 +5165,7 @@ public class HRegion implements HeapSize { // , Writable{ txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, - true, nonceGroup, nonce); + true, nonceGroup, nonce, seqNumber); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } @@ -5181,7 +5198,8 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { if (w != null) { - mvcc.completeMemstoreInsert(w); + w.setWriteNumber(seqNumber.longValue()); + mvcc.completeMemstoreInsertUseSeqNum(w); } closeRegionOperation(); if (this.metricsRegion != null) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 065fc4e..4c95a09 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -264,7 +264,7 @@ public class MemStore implements HeapSize { assert alloc.getData() != null; System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len); KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len); - newKv.setMvccVersion(kv.getMvccVersion()); + newKv.setMvccVersion(kv.getMvccVersionReference()); return newKv; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index b46d55b..0830f06 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -18,8 +18,17 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -32,8 +41,9 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; + private AtomicLong memstoreRead = new AtomicLong(); + private volatile long memstoreWrite; + private Queue inProgressWrites = new ConcurrentLinkedQueue(); private final Object readWaiters = new Object(); @@ -45,7 +55,7 @@ public class MultiVersionConsistencyControl { * Default constructor. Initializes the memstoreRead/Write points to 0. */ public MultiVersionConsistencyControl() { - this.memstoreRead = this.memstoreWrite = 0; + this.memstoreWrite = 0; } /** @@ -54,14 +64,85 @@ public class MultiVersionConsistencyControl { */ public void initialize(long startPoint) { synchronized (writeQueue) { - if (this.memstoreWrite != this.memstoreRead) { + if (this.memstoreWrite != this.memstoreRead.get()) { throw new RuntimeException("Already used this mvcc. Too late to initialize"); } - this.memstoreRead = this.memstoreWrite = startPoint; + this.memstoreRead.set(startPoint); + this.memstoreWrite = startPoint; } } + public WriteEntry beginMemstoreInsertUseSeqNum(long baseVal) { + WriteEntry e = new WriteEntry(baseVal); + inProgressWrites.add(e); + return e; + } + + public void completeMemstoreInsertUseSeqNum(WriteEntry e) { + advanceMemstoreUseSeqNum(e); + } + + boolean advanceMemstoreUseSeqNum(WriteEntry e) { + e.markCompleted(); + inProgressWrites.remove(e); + if(e.getWriteNumber().longValue() == Long.MAX_VALUE || e.getWriteNumber().longValue() == 0) + return false; + + while(true){ + long curVal = memstoreRead.get(); + if(curVal >= e.getWriteNumber().longValue()){ + break; + } + if(memstoreRead.compareAndSet(curVal, e.getWriteNumber().longValue())){ + break; + } + } + synchronized (readWaiters) { + readWaiters.notifyAll(); + } + return true; + } + + public void waitforPreviouseTranstionsComplete(WriteEntry e){ + boolean interrupted = false; + synchronized (readWaiters) { + while (!inProgressWrites.isEmpty()) { + try { + WriteEntry queueFirst = writeQueue.getFirst(); + if(queueFirst == e) { + return; + } + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + break; + } + } + } + if (interrupted) Thread.currentThread().interrupt(); + + } + + public void waitForReadUseSeqNum(WriteEntry e) { + boolean interrupted = false; + synchronized (readWaiters) { + while (memstoreRead.get() < e.getWriteNumber().longValue()) { + try { + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + break; + } + } + } + if (interrupted) Thread.currentThread().interrupt(); + } + /** * Generate and return a {@link WriteEntry} with a new write number. * To complete the WriteEntry and wait for it to be visible, @@ -109,14 +190,14 @@ public class MultiVersionConsistencyControl { WriteEntry queueFirst = writeQueue.getFirst(); if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { + if (nextReadValue+1 != queueFirst.getWriteNumber().longValue()) { throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + nextReadValue + " next: " + queueFirst.getWriteNumber()); } } if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); + nextReadValue = queueFirst.getWriteNumber().longValue(); writeQueue.removeFirst(); } else { break; @@ -129,11 +210,11 @@ public class MultiVersionConsistencyControl { if (nextReadValue > 0) { synchronized (readWaiters) { - memstoreRead = nextReadValue; + memstoreRead.set(nextReadValue); readWaiters.notifyAll(); } } - if (memstoreRead >= e.getWriteNumber()) { + if (memstoreRead.get() >= e.getWriteNumber().longValue()) { return true; } return false; @@ -147,7 +228,7 @@ public class MultiVersionConsistencyControl { public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { + while (memstoreRead.get() < e.getWriteNumber().longValue()) { try { readWaiters.wait(0); } catch (InterruptedException ie) { @@ -161,15 +242,15 @@ public class MultiVersionConsistencyControl { } public long memstoreReadPoint() { - return memstoreRead; + return memstoreRead.get(); } public static class WriteEntry { - private long writeNumber; + private MutableLong writeNumber; private boolean completed = false; WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; + this.writeNumber = new MutableLong(writeNumber); } void markCompleted() { this.completed = true; @@ -177,9 +258,12 @@ public class MultiVersionConsistencyControl { boolean isCompleted() { return this.completed; } - long getWriteNumber() { + MutableLong getWriteNumber() { return this.writeNumber; } + void setWriteNumber(long val){ + this.writeNumber.setValue(val); + } } public static final long FIXED_SIZE = ClassSize.align( diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 2af14b8..1e7c10c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -910,7 +912,7 @@ class FSHLog implements HLog, Syncable { public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException { append(info, tableName, edits, new ArrayList(), now, htd, true, true, sequenceId, - HConstants.NO_NONCE, HConstants.NO_NONCE); + HConstants.NO_NONCE, HConstants.NO_NONCE, new MutableLong()); } /** @@ -943,7 +945,7 @@ class FSHLog implements HLog, Syncable { @SuppressWarnings("deprecation") private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, - AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException { + AtomicLong sequenceId, long nonceGroup, long nonce, MutableLong seqNumber) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); @@ -955,6 +957,7 @@ class FSHLog implements HLog, Syncable { // get the sequence number from the passed Long. In normal flow, it is coming from the // region. long seqNum = sequenceId.incrementAndGet(); + seqNumber.setValue(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -993,9 +996,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, - boolean isInMemstore, long nonceGroup, long nonce) throws IOException { - return append(info, tableName, edits, clusterIds, - now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce); + boolean isInMemstore, long nonceGroup, long nonce, MutableLong seqNumber) throws IOException { + return append(info, tableName, edits, clusterIds, now, htd, false, isInMemstore, sequenceId, + nonceGroup, nonce, seqNumber); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ccf77e1..77e910a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -27,6 +27,8 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -281,7 +283,8 @@ public interface HLog { */ long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, - boolean isInMemstore, long nonceGroup, long nonce) throws IOException; + boolean isInMemstore, long nonceGroup, long nonce, MutableLong sequenceNum) + throws IOException; // TODO: Do we need all these versions of sync? void hsync() throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 684f78c..ea342dd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -267,7 +268,7 @@ public class HLogUtil { long now = EnvironmentEdgeManager.currentTimeMillis(); TableName tn = TableName.valueOf(c.getTableName().toByteArray()); long txid = log.appendNoSync(info, tn, e, new ArrayList(), now, htd, sequenceId, - false, HConstants.NO_NONCE, HConstants.NO_NONCE); + false, HConstants.NO_NONCE, HConstants.NO_NONCE, new MutableLong()); log.sync(txid); if (LOG.isTraceEnabled()) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ac8e028..124a40e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -3803,7 +3804,8 @@ public class TestHRegion { //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List)any(), - anyLong(), (HTableDescriptor)any(), (AtomicLong)any(), anyBoolean(), anyLong(), anyLong()); + anyLong(), (HTableDescriptor)any(), (AtomicLong)any(), anyBoolean(), anyLong(), anyLong(), + (MutableLong)any()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 64acde6..9640241 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Random; import java.util.UUID; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -118,7 +119,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, - region.getSequenceId(), true, nonce, nonce); + region.getSequenceId(), true, nonce, nonce, new MutableLong()); } else { hlog.append(hri, hri.getTable(), walEdit, now, htd, region.getSequenceId()); }