diff --git hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index e36ee43..b678f55 100644 --- hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; @@ -146,20 +145,19 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor } } if (deleteRows.size() > 0) { - Pair[] deleteWithLockArr = new Pair[deleteRows.size()]; + Mutation[] deleteArr = new Mutation[deleteRows.size()]; int i = 0; for (List deleteRow : deleteRows) { - Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp); - deleteWithLockArr[i++] = new Pair(delete, null); + deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); } - OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr); + OperationStatus[] opStatus = region.batchMutate(deleteArr); for (i = 0; i < opStatus.length; i++) { if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { break; } totalRowsDeleted++; if (deleteType == DeleteType.VERSION) { - byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute( + byte[] versionsDeleted = deleteArr[i].getAttribute( NO_OF_VERSIONS_TO_DELETE); if (versionsDeleted != null) { totalVersionsDeleted += Bytes.toInt(versionsDeleted); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 9ea9ffd..6839aef 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -253,12 +253,12 @@ public abstract class BaseRegionObserver implements RegionObserver { @Override public void preBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { } @Override public void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 91b6eae..9f3cb8d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -554,7 +554,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ void preBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + final MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * This will be called after applying a batch of Mutations on a region. The Mutations are added to @@ -564,7 +564,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + final MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * Called before checkAndPut diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e1bbfdf..3d9922b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; -import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -139,7 +138,6 @@ import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import com.google.protobuf.Descriptors; @@ -223,12 +221,13 @@ public class HRegion implements HeapSize { // , Writable{ // Members ////////////////////////////////////////////////////////////////////////////// - private final ConcurrentHashMap lockedRows = - new ConcurrentHashMap(); - private final ConcurrentHashMap lockIds = - new ConcurrentHashMap(); - private final AtomicInteger lockIdGenerator = new AtomicInteger(1); - static private Random rand = new Random(); + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); + private final ThreadLocal> rowLocksHeldByThread = + new ThreadLocal>() { + protected List initialValue() { + return new ArrayList(); + };}; protected final Map stores = new ConcurrentSkipListMap( Bytes.BYTES_RAWCOMPARATOR); @@ -1764,7 +1763,7 @@ public class HRegion implements HeapSize { // , Writable{ try { delete.getRow(); // All edits for the given row (across all column families) must happen atomically. - doBatchMutate(delete, null); + doBatchMutate(delete); } finally { closeRegionOperation(); } @@ -1787,7 +1786,7 @@ public class HRegion implements HeapSize { // , Writable{ delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setDurability(durability); - doBatchMutate(delete, null); + doBatchMutate(delete); } /** @@ -1862,7 +1861,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); try { // All edits for the given row (across all column families) must happen atomically. - doBatchMutate(put, null); + doBatchMutate(put); } finally { closeRegionOperation(); } @@ -1892,46 +1891,37 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Perform a batch put with no pre-specified locks - * @see HRegion#batchMutate(Pair[]) + * Perform a batch put + * @see HRegion#batchMutate(Mutation[]) */ public OperationStatus[] put(Put[] puts) throws IOException { - @SuppressWarnings("unchecked") - Pair putsAndLocks[] = new Pair[puts.length]; - - for (int i = 0; i < puts.length; i++) { - putsAndLocks[i] = new Pair(puts[i], null); - } - return batchMutate(putsAndLocks); + return batchMutate(puts); } /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutationsAndLocks - * the list of mutations paired with their requested lock IDs. + * @param mutations the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] batchMutate( - Pair[] mutationsAndLocks) throws IOException { - return batchMutate(mutationsAndLocks, false); + public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { + return batchMutate(mutations, false); } /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutationsAndLocks - * the list of mutations paired with their requested lock IDs. + * @param mutations the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(Pair[] mutationsAndLocks, boolean isReplay) + OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay) throws IOException { - BatchOperationInProgress> batchOp = - new BatchOperationInProgress>(mutationsAndLocks); + BatchOperationInProgress batchOp = + new BatchOperationInProgress(mutations); boolean initialized = false; @@ -1969,14 +1959,13 @@ public class HRegion implements HeapSize { // , Writable{ } - private void doPreMutationHook(BatchOperationInProgress> batchOp) + private void doPreMutationHook(BatchOperationInProgress batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { for (int i = 0 ; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Mutation m = nextPair.getFirst(); + Mutation m = batchOp.operations[i]; if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { // pre hook says skip this Put @@ -2005,7 +1994,7 @@ public class HRegion implements HeapSize { // , Writable{ } @SuppressWarnings("unchecked") - private long doMiniBatchMutation(BatchOperationInProgress> batchOp, + private long doMiniBatchMutation(BatchOperationInProgress batchOp, boolean isInReplay) throws IOException { // variable to note if all Put items are for the same CF -- metrics related @@ -2023,8 +2012,6 @@ public class HRegion implements HeapSize { // , Writable{ boolean walSyncSuccessful = false; boolean locked = false; - /** Keep track of the locks we hold so we can release them in finally clause */ - List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -2040,10 +2027,8 @@ public class HRegion implements HeapSize { // , Writable{ int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTimeMillis(); while (lastIndexExclusive < batchOp.operations.length) { - Pair nextPair = batchOp.operations[lastIndexExclusive]; - Mutation mutation = nextPair.getFirst(); + Mutation mutation = batchOp.operations[lastIndexExclusive]; boolean isPutMutation = mutation instanceof Put; - Integer providedLockId = nextPair.getSecond(); Map> familyMap = mutation.getFamilyMap(); // store the family map reference to allow for mutations @@ -2081,25 +2066,28 @@ public class HRegion implements HeapSize { // , Writable{ lastIndexExclusive++; continue; } + // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; - Integer acquiredLockId = null; - try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); - } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + boolean acquiredLock = false; + if (shouldBlock) { + try { + getRowLock(mutation.getRow()); + acquiredLock = true; + } catch (IOException ioe) { + LOG.warn("Failed getting lock in batch put, row=" + + Bytes.toStringBinary(mutation.getRow()), ioe); + } + } else { + acquiredLock = tryRowLock(mutation.getRow()); } - if (acquiredLockId == null) { + if (!acquiredLock) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch } - if (providedLockId == null) { - acquiredLocks.add(acquiredLockId); - } + lastIndexExclusive++; numReadyToWrite++; @@ -2141,7 +2129,7 @@ public class HRegion implements HeapSize { // , Writable{ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; - Mutation mutation = batchOp.operations[i].getFirst(); + Mutation mutation = batchOp.operations[i]; if (mutation instanceof Put) { updateKVTimestamps(familyMaps[i].values(), byteNow); noOfPuts++; @@ -2162,8 +2150,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2198,7 +2186,7 @@ public class HRegion implements HeapSize { // , Writable{ } batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - Mutation m = batchOp.operations[i].getFirst(); + Mutation m = batchOp.operations[i]; Durability tmpDur = getEffectiveDurability(m.getDurability()); if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; @@ -2221,10 +2209,10 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- - Mutation first = batchOp.operations[firstIndex].getFirst(); + Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + walEdit, mutation.getClusterId(), now, this.htableDescriptor); } // ------------------------------- @@ -2234,12 +2222,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (acquiredLocks != null) { - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); - } - acquiredLocks = null; - } + releaseMyRowLocks(); + // ------------------------- // STEP 7. Sync wal. // ------------------------- @@ -2249,8 +2233,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -2274,7 +2258,7 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.SUCCESS) { continue; } - Mutation m = batchOp.operations[i].getFirst(); + Mutation m = batchOp.operations[i]; if (m instanceof Put) { coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); } else { @@ -2297,11 +2281,7 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); } - if (acquiredLocks != null) { - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); - } - } + releaseMyRowLocks(); // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. @@ -2378,8 +2358,8 @@ public class HRegion implements HeapSize { // , Writable{ checkFamily(family); get.addColumn(family, qualifier); - // Lock row - Integer lid = getLock(null, get.getRow(), true); + // Lock row - note that doBatchMutate will relock this row if called + getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); List result = null; @@ -2425,27 +2405,24 @@ public class HRegion implements HeapSize { // , Writable{ if (matches) { // All edits for the given row (across all column families) must // happen atomically. - doBatchMutate((Mutation)w, lid); + doBatchMutate((Mutation)w); this.checkAndMutateChecksPassed.increment(); return true; } this.checkAndMutateChecksFailed.increment(); return false; } finally { - releaseRowLock(lid); + // may have already been released by doBatchMutate + releaseMyRowLocks(); } } finally { closeRegionOperation(); } } - @SuppressWarnings("unchecked") - private void doBatchMutate(Mutation mutation, Integer lid) throws IOException, + private void doBatchMutate(Mutation mutation) throws IOException, org.apache.hadoop.hbase.exceptions.DoNotRetryIOException { - Pair[] mutateWithLocks = new Pair[] { - new Pair(mutation, lid) - }; - OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -2621,7 +2598,7 @@ public class HRegion implements HeapSize { // , Writable{ Put p = new Put(row); p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); - doBatchMutate(p, null); + doBatchMutate(p); } /** @@ -2672,7 +2649,7 @@ public class HRegion implements HeapSize { // , Writable{ * called when a Put/Delete has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(BatchOperationInProgress> batchOp, + private void rollbackMemstore(BatchOperationInProgress batchOp, Map>[] familyMaps, int start, int end) { int kvsRolledback = 0; @@ -3182,60 +3159,40 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Obtain a lock on the given row. Blocks until success. - * - * I know it's strange to have two mappings: - *
-   *   ROWS  ==> LOCKS
-   * 
- * as well as - *
-   *   LOCKS ==> ROWS
-   * 
- *

It would be more memory-efficient to just have one mapping; - * maybe we'll do that in the future. - * - * @param row Name of row to lock. - * @throws IOException - * @return The id of the held lock. - */ - public Integer obtainRowLock(final byte [] row) throws IOException { - startRegionOperation(); - this.writeRequestsCount.increment(); - try { - return internalObtainRowLock(row, true); - } finally { - closeRegionOperation(); - } - } - - /** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns - * null if unavailable. + * false if unavailable. + * @return true if the lock was obtained, + * false if waitForLock was false and the lock was not obtained + * @throws IOException if waitForLock was true and the lock could not be obtained after waiting */ - private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) + private boolean internalObtainRowLock(final byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); + RowLockContext rowLockContext = new RowLockContext(rowLatch, Thread.currentThread()); // loop until we acquire the row lock (unless !waitForLock) while (true) { - CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); - if (existingLatch == null) { + RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); + if (existingContext == null) { + // Row is not already locked by any thread, add it to this thread's list + rowLocksHeldByThread.get().add(rowKey); + break; + } else if (existingContext.thread == Thread.currentThread()) { + // Row is already locked by current thread break; } else { - // row already locked + // Row is already locked by some other thread if (!waitForLock) { - return null; + return false; } try { - if (!existingLatch.await(this.rowLockWaitDuration, - TimeUnit.MILLISECONDS)) { + if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + Bytes.toStringBinary(row)); } @@ -3247,73 +3204,48 @@ public class HRegion implements HeapSize { // , Writable{ } } } - - // loop until we generate an unused lock id - while (true) { - Integer lockId = lockIdGenerator.incrementAndGet(); - HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); - if (existingRowKey == null) { - return lockId; - } else { - // lockId already in use, jump generator to a new spot - lockIdGenerator.set(rand.nextInt()); - } - } + + return true; } finally { closeRegionOperation(); } } /** - * Release the row lock! - * @param lockId The lock ID to release. + * Releases all row locks held by the current thread. */ - public void releaseRowLock(final Integer lockId) { - if (lockId == null) return; // null lock id, do nothing - HashedBytes rowKey = lockIds.remove(lockId); - if (rowKey == null) { - LOG.warn("Release unknown lockId: " + lockId); - return; - } - CountDownLatch rowLatch = lockedRows.remove(rowKey); - if (rowLatch == null) { - LOG.error("Releases row not locked, lockId: " + lockId + " row: " - + rowKey); - return; + public void releaseMyRowLocks() { + List locksHeld = rowLocksHeldByThread.get(); + for (HashedBytes rowKey : locksHeld) { + RowLockContext rowLockContext = lockedRows.remove(rowKey); + if (rowLockContext == null) { + LOG.error("Internal row lock state inconsistent, should not happen, row: " + rowKey); + continue; + } + rowLockContext.latch.countDown(); } - rowLatch.countDown(); + rowLocksHeldByThread.remove(); } /** - * See if row is currently locked. - * @param lockId - * @return boolean + * Acqures a lock on the given row. + * @throws IOException if the lock could not be obtained after waiting */ - boolean isRowLocked(final Integer lockId) { - return lockIds.containsKey(lockId); + public void getRowLock(byte[] row) throws IOException { + internalObtainRowLock(row, true); } /** - * Returns existing row lock if found, otherwise - * obtains a new row lock and returns it. - * @param lockid requested by the user, or null if the user didn't already hold lock - * @param row the row to lock - * @param waitForLock if true, will block until the lock is available, otherwise will - * simply return null if it could not acquire the lock. - * @return lockid or null if waitForLock is false and the lock was unavailable. + * Acquires lock on the given row if possible without waiting. + * @return true iff the lock was acquired */ - public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) - throws IOException { - Integer lid = null; - if (lockid == null) { - lid = internalObtainRowLock(row, waitForLock); - } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); - } - lid = lockid; + public boolean tryRowLock(byte[] row) { + try { + return internalObtainRowLock(row, false); + } catch (IOException e) { + LOG.error("Unexpected exception trying lock without wait", e); + return false; } - return lid; } /** @@ -4583,24 +4515,17 @@ public class HRegion implements HeapSize { // , Writable{ MultiVersionConsistencyControl.WriteEntry writeEntry = null; boolean locked = false; boolean walSyncSuccessful = false; - List acquiredLocks = null; long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); try { // 2. Acquire the row lock(s) - acquiredLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, fail if one lock times out - Integer lid = getLock(null, row, true); - if (lid == null) { - throw new IOException("Failed to acquire lock on " - + Bytes.toStringBinary(row)); - } - acquiredLocks.add(lid); + getRowLock(row); } // 3. Region lock - lock(this.updatesLock.readLock(), acquiredLocks.size()); + lock(this.updatesLock.readLock(), rowsToLock.size()); locked = true; long now = EnvironmentEdgeManager.currentTimeMillis(); @@ -4635,12 +4560,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 9. Release row lock(s) - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - acquiredLocks = null; - } + releaseMyRowLocks(); // 10. Sync edit log if (txid != 0) { syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); @@ -4665,12 +4585,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - } - + // release locks if some were acquired but another timed out + releaseMyRowLocks(); } // 12. Run post-process hook @@ -4766,7 +4682,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); WriteEntry w = null; try { - Integer lid = getLock(null, row, true); + getRowLock(row); lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) @@ -4883,7 +4799,7 @@ public class HRegion implements HeapSize { // , Writable{ flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + releaseMyRowLocks(); } if (writeToWAL) { // sync the transaction log outside the rowlock @@ -4936,7 +4852,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); WriteEntry w = null; try { - Integer lid = getLock(null, row, true); + getRowLock(row); lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) @@ -5029,7 +4945,7 @@ public class HRegion implements HeapSize { // , Writable{ flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + releaseMyRowLocks(); } if (writeToWAL) { // sync the transaction log outside the rowlock @@ -5070,21 +4986,32 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.OBJECT + ClassSize.ARRAY + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (12 * Bytes.SIZEOF_LONG) + - 2 * Bytes.SIZEOF_BOOLEAN); - + (11 * Bytes.SIZEOF_LONG) + + 4 * Bytes.SIZEOF_BOOLEAN); + + // woefully out of date - currently missing: + // 1 x HashMap - coprocessorServiceHandlers + // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL, + // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount, + // writeRequestsCount, updatesBlockedMs + // 1 x HRegion$WriteState - writestate + // 1 x RegionCoprocessorHost - coprocessorHost + // 1 x RegionSplitPolicy - splitPolicy + // 1 x MetricsRegion - metricsRegion + // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper + // 1 x ThreadLocal - rowLocksHeldByThread public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL - ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints + (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock ClassSize.ARRAYLIST + // recentFlushes MultiVersionConsistencyControl.FIXED_SIZE // mvcc + ClassSize.TREEMAP // maxSeqIdInStores + + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress ; @Override @@ -5093,7 +5020,7 @@ public class HRegion implements HeapSize { // , Writable{ for (Store store : this.stores.values()) { heapSize += store.heapSize(); } - // this does not take into account row locks, recent flushes, mvcc entries + // this does not take into account row locks, recent flushes, mvcc entries, and more return heapSize; } @@ -5657,4 +5584,14 @@ public class HRegion implements HeapSize { // , Writable{ */ void failedBulkLoad(byte[] family, String srcPath) throws IOException; } + + private static class RowLockContext { + private CountDownLatch latch; + private Thread thread; + + public RowLockContext(CountDownLatch latch, Thread thread) { + this.latch = latch; + this.thread = thread; + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e6c991e..7719ceb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -109,11 +109,11 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.TableLockManager; @@ -171,9 +171,9 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; -import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; @@ -3936,8 +3936,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa */ protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region, final List mutations, final CellScanner cells, boolean isReplay) { - @SuppressWarnings("unchecked") - Pair[] mutationsWithLocks = new Pair[mutations.size()]; + Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { @@ -3954,7 +3953,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } - mutationsWithLocks[i++] = new Pair(mutation, null); + mArray[i++] = mutation; builder.addResult(result); } @@ -3963,7 +3962,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay); + OperationStatus codes[] = region.batchMutate(mArray); for (i = 0; i < codes.length; i++) { switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 9102aaa..4fe11f4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -993,7 +993,7 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBatchMutate( - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { boolean bypass = false; ObserverContext ctx = null; for (RegionEnvironment env : coprocessors) { @@ -1018,7 +1018,7 @@ public class RegionCoprocessorHost * @throws IOException */ public void postBatchMutate( - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { ObserverContext ctx = null; for (RegionEnvironment env : coprocessors) { if (env.getInstance() instanceof RegionObserver) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 0a588af..d6c0c97 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -407,7 +407,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void preBatchMutate(ObserverContext c, - MiniBatchOperationInProgress> miniBatchOp) throws IOException { + MiniBatchOperationInProgress miniBatchOp) throws IOException { RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); @@ -417,7 +417,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 6d4cbe6..67f03e8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -59,11 +59,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.Pair; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; - /** * Testing of HRegion.incrementColumnValue, HRegion.increment, @@ -528,16 +525,12 @@ public class TestAtomicOperation extends HBaseTestCase { final MockHRegion region = (MockHRegion) TestHRegion.initHRegion( Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(family)); - List> putsAndLocks = Lists.newArrayList(); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); puts[0] = put; - Pair pair = new Pair(puts[0], null); - - putsAndLocks.add(pair); - - region.batchMutate(putsAndLocks.toArray(new Pair[0])); + + region.batchMutate(puts); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); ctx.addThread(new PutThread(ctx, region)); @@ -565,15 +558,12 @@ public class TestAtomicOperation extends HBaseTestCase { } public void doWork() throws Exception { - List> putsAndLocks = Lists.newArrayList(); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); puts[0] = put; - Pair pair = new Pair(puts[0], null); - putsAndLocks.add(pair); testStep = TestStep.PUT_STARTED; - region.batchMutate(putsAndLocks.toArray(new Pair[0])); + region.batchMutate(puts); } } @@ -607,16 +597,16 @@ public class TestAtomicOperation extends HBaseTestCase { } @Override - public void releaseRowLock(Integer lockId) { + public void releaseMyRowLocks() { if (testStep == TestStep.INIT) { - super.releaseRowLock(lockId); + super.releaseMyRowLocks(); return; } if (testStep == TestStep.PUT_STARTED) { try { testStep = TestStep.PUT_COMPLETED; - super.releaseRowLock(lockId); + super.releaseMyRowLocks(); // put has been written to the memstore and the row lock has been released, but the // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of // operations would cause the non-atomicity to show up: @@ -634,16 +624,16 @@ public class TestAtomicOperation extends HBaseTestCase { } } else if (testStep == TestStep.CHECKANDPUT_STARTED) { - super.releaseRowLock(lockId); + super.releaseMyRowLocks(); } } @Override - public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { + public void getRowLock(byte[] row) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return super.getLock(lockid, row, waitForLock); + super.getRowLock(row); } } } \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8937b17..ce8c617 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -107,7 +106,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -804,7 +802,7 @@ public class TestHRegion extends HBaseTestCase { metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); - Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); + region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); @@ -829,7 +827,7 @@ public class TestHRegion extends HBaseTestCase { } } LOG.info("...releasing row lock, which should let put thread continue"); - region.releaseRowLock(lockedRow); + region.releaseMyRowLocks(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); @@ -840,29 +838,6 @@ public class TestHRegion extends HBaseTestCase { OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } - LOG.info("Nexta, a batch put which uses an already-held lock"); - lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); - LOG.info("...obtained row lock"); - List> putsAndLocks = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - Pair pair = new Pair(puts[i], null); - if (i == 2) pair.setSecond(lockedRow); - putsAndLocks.add(pair); - } - - codes = region.batchMutate(putsAndLocks.toArray(new Pair[0])); - LOG.info("...performed put"); - for (int i = 0; i < 10; i++) { - assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : - OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); - } - // Make sure we didn't do an extra batch - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source); - - // Make sure we still hold lock - assertTrue(region.isRowLocked(lockedRow)); - LOG.info("...releasing lock"); - region.releaseRowLock(lockedRow); } finally { HRegion.closeHRegion(this.region); this.region = null;