From 08eda24b7d2ab9658468e3cb9b41df3a46f8b872 Mon Sep 17 00:00:00 2001 From: nke Date: Wed, 18 Mar 2015 14:31:14 -0700 Subject: [PATCH] Cleaner w/o isDeleteMutation, fix linelengths style issue, add comment on checkAndMutate Summary: Test Plan: Reviewers: Subscribers: --- .../apache/hadoop/hbase/regionserver/HRegion.java | 55 +++++++++++++++++++--- .../hbase/regionserver/TestAtomicOperation.java | 5 +- .../hbase/regionserver/TestHeapMemoryManager.java | 6 +-- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3b1f267..cc49d38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3036,7 +3036,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLockInternal(mutation.getRow(), shouldBlock); + // request a read lock for a put or delete; this is safe if we're already + // within a write lock context (e.g. from checkAndMutate) as it will just + // reuse the existing context. + rowLock = getRowLockInternal(mutation.getRow(), shouldBlock, true); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -5093,20 +5096,45 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // /** * A version of getRowLock(byte[], boolean) to use when a region operation has already been * started (the calling thread has already acquired the region-close-guard lock). + * + * This assumes a default to a fully-exclusive (writer) lock, and is intended + * for read-modify-write operations and other cases where we can't be sure of + * safe concurrent writes. Safe concurrent write operations should use + * getRowLockInternal(row, waitForLock, /readLock/ true) for lower lock latency. */ protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException { + return getRowLockInternal(row, waitForLock, false); + } + + /** + * A version of getRowLock(byte[], boolean) to use when a region operation + * has already been started (the calling thread has already acquired the + * region-close-guard lock). + * + * @param readLock false indicates this assumes should be a fully-exclusive + * (writer) lock, and is intendedfor read-modify-write operations + * and other cases where we can't be sure of safe concurrent writes. + * Use true for simple puts and other safe concurrent operations + * for lower lock latency. + */ + protected RowLock getRowLockInternal(byte[] row, boolean waitForLock, boolean readLock) + throws IOException { checkRow(row, "row lock"); HashedBytes rowKey = new HashedBytes(row); - RowLockContext rowLockContext = new RowLockContext(rowKey); - + RowLockContext rowLockContext = new RowLockContext(rowKey, readLock); // loop until we acquire the row lock (unless !waitForLock) while (true) { RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); if (existingContext == null) { // Row is not already locked by any thread, use newly created context. break; - } else if (existingContext.ownedByCurrentThread()) { - // Row is already locked by current thread, reuse existing context instead. + } else if (readLock && existingContext.isReadLock()) { + // Row is already locked by a read lock, and we're asking for a read lock; + // reuse the current context. + rowLockContext = existingContext; + break; + } else if (existingContext.ownedByCurrentThread() && !existingContext.isReadLock()) { + // Row is already locked for write by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else { @@ -6638,6 +6666,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // acquiredRowLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLock(row)); } // 3. Region lock @@ -7972,14 +8001,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // @VisibleForTesting class RowLockContext { private final HashedBytes row; private final CountDownLatch latch = new CountDownLatch(1); + private final boolean readLock; private final Thread thread; private int lockCount = 0; + // defaults readLock to false (fully exclusive) RowLockContext(HashedBytes row) { this.row = row; + this.readLock = false; + this.thread = Thread.currentThread(); + } + + RowLockContext(HashedBytes row, boolean readLock) { + this.row = row; + this.readLock = readLock; this.thread = Thread.currentThread(); } + boolean isReadLock() { + return readLock; + } + boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } @@ -7997,7 +8039,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } void releaseLock() { - if (!ownedByCurrentThread()) { + // todo: is this safe, or do we need to keep a set of owning threads on read locks? + if (!ownedByCurrentThread() && !readLock) { throw new IllegalArgumentException("Lock held by thread: " + thread + " cannot be released by different thread: " + Thread.currentThread()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 9a2c23b..0a19e31 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -615,11 +615,12 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException { + public RowLock getRowLockInternal(final byte[] row, boolean waitForLock, boolean readLock) + throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLockInternal(row, waitForLock)); + return new WrappedRowLock(super.getRowLockInternal(row, waitForLock, readLock)); } public class WrappedRowLock extends RowLock { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index b96a6a5..43be56f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -311,11 +311,11 @@ public class TestHeapMemoryManager { } private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) { - long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); + long expectedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); if (expectedDeltaPercent > 0) { - assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace)); + assertTrue(expectedMinDelta <= (newHeapSpace - oldHeapSpace)); } else { - assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace)); + assertTrue(expectedMinDelta <= (oldHeapSpace - newHeapSpace)); } } -- 2.1.0