>[] familyMaps,
int start, int end) {
int kvsRolledback = 0;
@@ -3069,59 +3035,36 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
- * Obtain a lock on the given row. Blocks until success.
- *
- * I know it's strange to have two mappings:
- *
- * ROWS ==> LOCKS
- *
- * as well as
- *
- * LOCKS ==> ROWS
- *
- * It would be more memory-efficient to just have one mapping;
- * maybe we'll do that in the future.
- *
- * @param row Name of row to lock.
- * @throws IOException
- * @return The id of the held lock.
- */
- public Integer obtainRowLock(final byte [] row) throws IOException {
- startRegionOperation();
- this.writeRequestsCount.increment();
- try {
- return internalObtainRowLock(row, true);
- } finally {
- closeRegionOperation();
- }
- }
-
- /**
* Obtains or tries to obtain the given row lock.
* @param waitForLock if true, will block until the lock is available.
* Otherwise, just tries to obtain the lock and returns
- * null if unavailable.
+ * false if unavailable.
+ * @return true if the lock was obtained, false if waitForLock was false and the lock was not obtained
+ * @throws IOException if waitForLock was true and the lock could not be obtained after waiting
*/
- private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
+ private boolean internalObtainRowLock(final byte[] row, boolean waitForLock)
throws IOException {
checkRow(row, "row lock");
startRegionOperation();
try {
HashedBytes rowKey = new HashedBytes(row);
CountDownLatch rowLatch = new CountDownLatch(1);
+ RowLockContext rowLockContext = new RowLockContext(rowLatch, Thread.currentThread());
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
- CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
- if (existingLatch == null) {
+ RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+ // If the row is already locked by this same thread we acquired it
+ if (existingContext == null
+ || existingContext.thread == Thread.currentThread()) {
break;
} else {
- // row already locked
+ // row already locked by some other thread
if (!waitForLock) {
- return null;
+ return false;
}
try {
- if (!existingLatch.await(this.rowLockWaitDuration,
+ if (!existingContext.latch.await(this.rowLockWaitDuration,
TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out on getting lock for row="
+ Bytes.toStringBinary(row));
@@ -3135,72 +3078,48 @@ public class HRegion implements HeapSize { // , Writable{
}
}
- // loop until we generate an unused lock id
- while (true) {
- Integer lockId = lockIdGenerator.incrementAndGet();
- HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
- if (existingRowKey == null) {
- return lockId;
- } else {
- // lockId already in use, jump generator to a new spot
- lockIdGenerator.set(rand.nextInt());
- }
- }
+ locksHeldByThread.get().add(rowKey);
+ return true;
} finally {
closeRegionOperation();
}
}
/**
- * Release the row lock!
- * @param lockId The lock ID to release.
+ * Releases all row locks held by the current thread.
*/
- public void releaseRowLock(final Integer lockId) {
- if (lockId == null) return; // null lock id, do nothing
- HashedBytes rowKey = lockIds.remove(lockId);
- if (rowKey == null) {
- LOG.warn("Release unknown lockId: " + lockId);
- return;
- }
- CountDownLatch rowLatch = lockedRows.remove(rowKey);
- if (rowLatch == null) {
- LOG.error("Releases row not locked, lockId: " + lockId + " row: "
- + rowKey);
- return;
+ public void releaseMyRowLocks() {
+ List locksHeld = locksHeldByThread.get();
+ for (HashedBytes rowKey : locksHeld) {
+ RowLockContext rowLockContext = lockedRows.remove(rowKey);
+ if (rowLockContext == null) {
+ LOG.error("Internal row lcok state inconsistent, should not happen, row: " + rowKey);
+ continue;
+ }
+ rowLockContext.latch.countDown();
}
- rowLatch.countDown();
+ locksHeld.clear();
}
/**
- * See if row is currently locked.
- * @param lockId
- * @return boolean
+ * Acqures a lock on the given row.
+ * @throws IOException if the lock could not be obtained after waiting
*/
- boolean isRowLocked(final Integer lockId) {
- return lockIds.containsKey(lockId);
+ public void getRowLock(byte [] row) throws IOException {
+ internalObtainRowLock(row, true);
}
-
+
/**
- * Returns existing row lock if found, otherwise
- * obtains a new row lock and returns it.
- * @param lockid requested by the user, or null if the user didn't already hold lock
- * @param row the row to lock
- * @param waitForLock if true, will block until the lock is available, otherwise will
- * simply return null if it could not acquire the lock.
- * @return lockid or null if waitForLock is false and the lock was unavailable.
+ * Acquires lock on the given row if possible without waiting.
+ * @return true iff the lock was acquired
*/
- public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
- throws IOException {
- Integer lid = null;
- if (lockid == null) {
- lid = internalObtainRowLock(row, waitForLock);
- } else {
- if (!isRowLocked(lockid)) {
- throw new IOException("Invalid row lock");
- }
- lid = lockid;
+ public boolean tryRowLock(byte [] row) {
+ try {
+ return internalObtainRowLock(row, false);
+ } catch (IOException e) {
+ LOG.error("Unexpected exception trying lock without wait", e);
+ return false;
}
- return lid;
}
/**
@@ -4521,15 +4440,9 @@ public class HRegion implements HeapSize { // , Writable{
Collection rowsToLock = processor.getRowsToLock();
try {
// 2. Acquire the row lock(s)
- acquiredLocks = new ArrayList(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, fail if one lock times out
- Integer lid = getLock(null, row, true);
- if (lid == null) {
- throw new IOException("Failed to acquire lock on "
- + Bytes.toStringBinary(row));
- }
- acquiredLocks.add(lid);
+ getRowLock(row);
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredLocks.size());
@@ -4567,12 +4480,7 @@ public class HRegion implements HeapSize { // , Writable{
}
// 9. Release row lock(s)
- if (acquiredLocks != null) {
- for (Integer lid : acquiredLocks) {
- releaseRowLock(lid);
- }
- acquiredLocks = null;
- }
+ releaseMyRowLocks();
// 10. Sync edit log
if (txid != 0) {
syncOrDefer(txid);
@@ -4597,11 +4505,8 @@ public class HRegion implements HeapSize { // , Writable{
this.updatesLock.readLock().unlock();
locked = false;
}
- if (acquiredLocks != null) {
- for (Integer lid : acquiredLocks) {
- releaseRowLock(lid);
- }
- }
+ // release locks if some were acquired but another timed out
+ releaseMyRowLocks();
}
@@ -4697,7 +4602,7 @@ public class HRegion implements HeapSize { // , Writable{
this.writeRequestsCount.increment();
WriteEntry w = null;
try {
- Integer lid = getLock(null, row, true);
+ getRowLock(row);
lock(this.updatesLock.readLock());
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
@@ -4813,7 +4718,7 @@ public class HRegion implements HeapSize { // , Writable{
flush = isFlushSize(size);
} finally {
this.updatesLock.readLock().unlock();
- releaseRowLock(lid);
+ releaseMyRowLocks();
}
if (writeToWAL) {
syncOrDefer(txid); // sync the transaction log outside the rowlock
@@ -4864,7 +4769,7 @@ public class HRegion implements HeapSize { // , Writable{
this.writeRequestsCount.increment();
WriteEntry w = null;
try {
- Integer lid = getLock(null, row, true);
+ getRowLock(row);
lock(this.updatesLock.readLock());
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
@@ -4956,7 +4861,7 @@ public class HRegion implements HeapSize { // , Writable{
flush = isFlushSize(size);
} finally {
this.updatesLock.readLock().unlock();
- releaseRowLock(lid);
+ releaseMyRowLocks();
}
if (writeToWAL) {
syncOrDefer(txid); // sync the transaction log outside the rowlock
@@ -5510,4 +5415,14 @@ public class HRegion implements HeapSize { // , Writable{
*/
void failedBulkLoad(byte[] family, String srcPath) throws IOException;
}
+
+ static class RowLockContext {
+ private CountDownLatch latch;
+ private Thread thread;
+
+ public RowLockContext(CountDownLatch latch, Thread thread) {
+ this.latch = latch;
+ this.thread = thread;
+ }
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7b7a953..4aa4034 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3751,8 +3751,7 @@ public class HRegionServer implements ClientProtocol,
*/
protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
final List mutations, final CellScanner cells) {
- @SuppressWarnings("unchecked")
- Pair[] mutationsWithLocks = new Pair[mutations.size()];
+ Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@@ -3769,7 +3768,7 @@ public class HRegionServer implements ClientProtocol,
mutation = ProtobufUtil.toDelete(m, cells);
batchContainsDelete = true;
}
- mutationsWithLocks[i++] = new Pair(mutation, null);
+ mArray[i++] = mutation;
builder.addResult(result);
}
@@ -3778,7 +3777,7 @@ public class HRegionServer implements ClientProtocol,
cacheFlusher.reclaimMemStoreMemory();
}
- OperationStatus codes[] = region.batchMutate(mutationsWithLocks);
+ OperationStatus codes[] = region.batchMutate(mArray);
for (i = 0; i < codes.length; i++) {
switch (codes[i].getOperationStatusCode()) {
case BAD_FAMILY:
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 02c97ad..0afe472 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -989,7 +989,7 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public boolean preBatchMutate(
- final MiniBatchOperationInProgress> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress miniBatchOp) throws IOException {
boolean bypass = false;
ObserverContext ctx = null;
for (RegionEnvironment env : coprocessors) {
@@ -1014,7 +1014,7 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postBatchMutate(
- final MiniBatchOperationInProgress> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress miniBatchOp) throws IOException {
ObserverContext ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index cd70387..b3f7034 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -406,7 +406,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override
public void preBatchMutate(ObserverContext c,
- MiniBatchOperationInProgress> miniBatchOp) throws IOException {
+ MiniBatchOperationInProgress miniBatchOp) throws IOException {
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -416,7 +416,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override
public void postBatchMutate(final ObserverContext c,
- final MiniBatchOperationInProgress> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress miniBatchOp) throws IOException {
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java
index de79409..74918ab 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java
@@ -19,7 +19,6 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -27,12 +26,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Lists;
-
/**
* Test of HBASE-7051; that checkAndPuts and puts behave atomically with respect to each other.
* Rather than perform a bunch of trials to verify atomicity, this test recreates a race condition
@@ -64,16 +60,12 @@ public class TestHBase7051 {
final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName),
tableName, conf, Bytes.toBytes(family));
- List> putsAndLocks = Lists.newArrayList();
Put[] puts = new Put[1];
Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
puts[0] = put;
- Pair pair = new Pair(puts[0], null);
-
- putsAndLocks.add(pair);
- region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+ region.batchMutate(puts);
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
ctx.addThread(new PutThread(ctx, region));
@@ -101,15 +93,12 @@ public class TestHBase7051 {
}
public void doWork() throws Exception {
- List> putsAndLocks = Lists.newArrayList();
Put[] puts = new Put[1];
Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
puts[0] = put;
- Pair pair = new Pair(puts[0], null);
- putsAndLocks.add(pair);
testStep = TestStep.PUT_STARTED;
- region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+ region.batchMutate(puts);
}
}
@@ -143,16 +132,16 @@ public class TestHBase7051 {
}
@Override
- public void releaseRowLock(Integer lockId) {
+ public void releaseMyRowLocks() {
if (testStep == TestStep.INIT) {
- super.releaseRowLock(lockId);
+ super.releaseMyRowLocks();
return;
}
if (testStep == TestStep.PUT_STARTED) {
try {
testStep = TestStep.PUT_COMPLETED;
- super.releaseRowLock(lockId);
+ super.releaseMyRowLocks();
// put has been written to the memstore and the row lock has been released, but the
// MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
// operations would cause the non-atomicity to show up:
@@ -170,16 +159,16 @@ public class TestHBase7051 {
}
}
else if (testStep == TestStep.CHECKANDPUT_STARTED) {
- super.releaseRowLock(lockId);
+ super.releaseMyRowLocks();
}
}
@Override
- public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException {
+ public void getRowLock(byte[] row) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
- return super.getLock(lockid, row, waitForLock);
+ super.getRowLock(row);
}
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index e1b23fb..88b0955 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -92,7 +91,6 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
@@ -100,8 +98,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
-import com.google.common.collect.Lists;
-
/**
* Basic stand-alone testing of HRegion.
*
@@ -654,7 +650,6 @@ public class TestHRegion extends HBaseTestCase {
}
}
- @SuppressWarnings("unchecked")
public void testBatchPut() throws Exception {
byte[] b = Bytes.toBytes(getName());
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
@@ -694,7 +689,7 @@ public class TestHRegion extends HBaseTestCase {
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
- Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
+ region.getRowLock(Bytes.toBytes("row_2"));
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
@@ -719,7 +714,7 @@ public class TestHRegion extends HBaseTestCase {
}
}
LOG.info("...releasing row lock, which should let put thread continue");
- region.releaseRowLock(lockedRow);
+ region.releaseMyRowLocks();
LOG.info("...joining on thread");
ctx.stop();
LOG.info("...checking that next batch was synced");
@@ -730,29 +725,6 @@ public class TestHRegion extends HBaseTestCase {
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
}
- LOG.info("Nexta, a batch put which uses an already-held lock");
- lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
- LOG.info("...obtained row lock");
- List> putsAndLocks = Lists.newArrayList();
- for (int i = 0; i < 10; i++) {
- Pair pair = new Pair(puts[i], null);
- if (i == 2) pair.setSecond(lockedRow);
- putsAndLocks.add(pair);
- }
-
- codes = region.batchMutate(putsAndLocks.toArray(new Pair[0]));
- LOG.info("...performed put");
- for (int i = 0; i < 10; i++) {
- assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
- OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
- }
- // Make sure we didn't do an extra batch
- metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source);
-
- // Make sure we still hold lock
- assertTrue(region.isRowLocked(lockedRow));
- LOG.info("...releasing lock");
- region.releaseRowLock(lockedRow);
} finally {
HRegion.closeHRegion(this.region);
this.region = null;