diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index be41deb..fa7449f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5879,9 +5879,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); RowLock rowLock = null; - WriteEntry w = null; + // WriteEntry w = null; WALKey walKey = null; - long mvccNum = 0; + // long mvccNum = 0; List memstoreCells = new ArrayList(); boolean doRollBackMemstore = false; try { @@ -5891,7 +5891,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // try { // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.waitForPreviousTransactionsComplete(); + // mvcc.waitForPreviousTransactionsComplete(); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); if (r != null) { @@ -5899,8 +5899,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + // mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + // w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family: @@ -5916,6 +5916,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // Collections.sort(family.getValue(), store.getComparator()); // Get previous values for all columns in this family Get get = new Get(row); + get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); for (Cell cell: family.getValue()) { get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); } @@ -5982,7 +5983,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // val, 0, val.length, newTags); - CellUtil.setSequenceId(newKV, mvccNum); + // REMOVED CellUtil.setSequenceId(newKV, mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { @@ -6010,28 +6011,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - //Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { - Pair ret = store.add(cell); - size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); - doRollBackMemstore = true; - } - } - } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } - // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { if (writeToWAL) { @@ -6055,24 +6034,52 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.updatesLock.readLock().unlock(); } } finally { - rowLock.release(); - rowLock = null; + // CHANGE: ALL UNDER LOCK NOW + // rowLock.release(); + // rowLock = null; } // sync the transaction log outside the rowlock if(txid != 0){ syncOrDefer(txid, durability); } + //Actually write to Memstore now + if (!tempMemstore.isEmpty()) { + for (Map.Entry> entry : tempMemstore.entrySet()) { + for (Cell cell: entry.getValue()) CellUtil.setSequenceId(cell, walKey.getSequenceId()); + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + memstoreCells.addAll(entry.getValue()); + } else { + // otherwise keep older versions around + for (Cell cell : entry.getValue()) { + Pair ret = store.add(cell); + size += ret.getFirst(); + memstoreCells.add(ret.getSecond()); + doRollBackMemstore = true; + } + } + } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } + // FIX THISSSSSSSSSSSSS NO NEED TO ROLLBACK BECAUSE OF REORDER doRollBackMemstore = false; } finally { - if (rowLock != null) { - rowLock.release(); - } // if the wal sync was unsuccessful, remove keys from memstore + // NO ROLLBACK FIXXXX if (doRollBackMemstore) { rollbackMemstore(memstoreCells); } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + // if (w != null) { + WriteEntry we = mvcc.beginMemstoreInsertWithSeqNum(walKey.getSequenceId()); + mvcc.advanceMemstore(we); +// mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + // } + if (rowLock != null) { + rowLock.release(); + rowLock = null; } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java index 2652750..179c8d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java @@ -50,6 +50,8 @@ public class TestIncrement { @Rule public TestName name = new TestName(); private static HBaseTestingUtility TEST_UTIL; private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment"); + private static final int THREAD_COUNT = 100; + private static final int INCREMENTS = 10000; @Before public void setUp() throws Exception { @@ -102,6 +104,10 @@ public class TestIncrement { } } } + + Increment getIncrement() { + return this.increment; + } } /** @@ -145,8 +151,8 @@ public class TestIncrement { HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName()); long startTime = System.currentTimeMillis(); try { - final int threadCount = 100; - final int incrementCount = 25000; + final int threadCount = THREAD_COUNT; + final int incrementCount = INCREMENTS; SingleCellIncrementer [] threads = new SingleCellIncrementer[threadCount]; for (int i = 0; i < threads.length; i++) { threads[i] = new SingleCellIncrementer(i, incrementCount, region, increment); @@ -178,8 +184,8 @@ public class TestIncrement { final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName()); long startTime = System.currentTimeMillis(); try { - final int threadCount = 100; - final int incrementCount = 25000; + final int threadCount = THREAD_COUNT; + final int incrementCount = INCREMENTS; SingleCellIncrementer [] threads = new SingleCellIncrementer[threadCount]; for (int i = 0; i < threads.length; i++) { byte [] rowBytes = Bytes.toBytes(i); @@ -193,6 +199,10 @@ public class TestIncrement { for (int i = 0; i < threads.length; i++) { threads[i].join(); } + Increment increment = new Increment(threads[0].getIncrement().getRow()); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 0); + Result incrementResult = region.increment(increment); + LOG.info(Bytes.toLong(incrementResult.listCells().get(0).getValue())); RegionScanner regionScanner = region.getScanner(new Scan()); List cells = new ArrayList(100); while(regionScanner.next(cells)) continue; @@ -217,8 +227,8 @@ public class TestIncrement { final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName()); long startTime = System.currentTimeMillis(); try { - final int threadCount = 100; - final int incrementCount = 25000; + final int threadCount = THREAD_COUNT; + final int incrementCount = INCREMENTS; CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[threadCount]; for (int i = 0; i < threads.length; i++) { threads[i] = new CrossRowCellIncrementer(i, incrementCount, region, threadCount);