diff --git hbase-common/pom.xml hbase-common/pom.xml index 7b5ef83..12847b4 100644 --- hbase-common/pom.xml +++ hbase-common/pom.xml @@ -153,6 +153,22 @@ + + + org.codehaus.mojo + + build-helper-maven-plugin + + [1.5,) + + add-source + add-test-resource + + + + + + diff --git hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index e36ee43..4936484 100644 --- hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; @@ -146,21 +145,20 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor } } if (deleteRows.size() > 0) { - Pair[] deleteWithLockArr = new Pair[deleteRows.size()]; + Mutation[] mutations = new Mutation[deleteRows.size()]; int i = 0; for (List deleteRow : deleteRows) { Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp); - deleteWithLockArr[i++] = new Pair(delete, null); + mutations[i++] = delete; } - OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr); + OperationStatus[] opStatus = region.batchMutate(mutations); for (i = 0; i < opStatus.length; i++) { if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { break; } totalRowsDeleted++; if (deleteType == DeleteType.VERSION) { - byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute( - NO_OF_VERSIONS_TO_DELETE); + byte[] versionsDeleted = mutations[i].getAttribute(NO_OF_VERSIONS_TO_DELETE); if (versionsDeleted != null) { totalVersionsDeleted += Bytes.toInt(versionsDeleted); } diff --git hbase-server/pom.xml hbase-server/pom.xml index 2ea730c..eb704f9 100644 --- hbase-server/pom.xml +++ hbase-server/pom.xml @@ -258,6 +258,22 @@ + + + org.codehaus.mojo + + build-helper-maven-plugin + + [1.5,) + + add-source + add-test-resource + + + + + + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 9c54de7..c8bf42b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -251,12 +251,12 @@ public abstract class BaseRegionObserver implements RegionObserver { @Override public void preBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { } @Override public void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 461bc31..b18a6cc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -553,7 +553,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ void preBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + final MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * This will be called after applying a batch of Mutations on a region. The Mutations are added to @@ -563,7 +563,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + final MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * Called before checkAndPut diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 58c91de..7c89059 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.EOFException; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; @@ -35,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; -import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -67,8 +65,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -124,7 +120,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.TakeSnapshotUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; @@ -206,12 +201,13 @@ public class HRegion implements HeapSize { // , Writable{ // Members ////////////////////////////////////////////////////////////////////////////// - private final ConcurrentHashMap lockedRows = - new ConcurrentHashMap(); - private final ConcurrentHashMap lockIds = - new ConcurrentHashMap(); - private final AtomicInteger lockIdGenerator = new AtomicInteger(1); - static private Random rand = new Random(); + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); + private final ThreadLocal> locksHeldByThread = + new ThreadLocal>() { + protected List initialValue() { + return new ArrayList(); + };}; protected final Map stores = new ConcurrentSkipListMap( Bytes.BYTES_RAWCOMPARATOR); @@ -1723,7 +1719,7 @@ public class HRegion implements HeapSize { // , Writable{ try { byte [] row = delete.getRow(); // All edits for the given row (across all column families) must happen atomically. - doBatchMutate(delete, null); + doBatchMutate(delete); } finally { closeRegionOperation(); } @@ -1745,7 +1741,7 @@ public class HRegion implements HeapSize { // , Writable{ delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - doBatchMutate(delete, null); + doBatchMutate(delete); } /** @@ -1828,15 +1824,8 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(); this.writeRequestsCount.increment(); try { - // We obtain a per-row lock, so other clients will block while one client - // performs an update. The read lock is released by the client calling - // #commit or #abort or if the HRegionServer lease on the lock expires. - // See HRegionServer#RegionListener for how the expire on HRegionServer - // invokes a HRegion#abort. - byte [] row = put.getRow(); - // All edits for the given row (across all column families) must happen atomically. - doBatchMutate(put, null); + doBatchMutate(put); } finally { closeRegionOperation(); } @@ -1866,32 +1855,24 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Perform a batch put with no pre-specified locks + * Perform a batch put * @see HRegion#batchMutate(Pair[]) */ public OperationStatus[] put(Put[] puts) throws IOException { - @SuppressWarnings("unchecked") - Pair putsAndLocks[] = new Pair[puts.length]; - - for (int i = 0; i < puts.length; i++) { - putsAndLocks[i] = new Pair(puts[i], null); - } - return batchMutate(putsAndLocks); + return batchMutate(puts); } /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutationsAndLocks - * the list of mutations paired with their requested lock IDs. + * @param mutations the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] batchMutate( - Pair[] mutationsAndLocks) throws IOException { - BatchOperationInProgress> batchOp = - new BatchOperationInProgress>(mutationsAndLocks); + public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { + BatchOperationInProgress batchOp = + new BatchOperationInProgress(mutations); boolean initialized = false; @@ -1920,14 +1901,13 @@ public class HRegion implements HeapSize { // , Writable{ return batchOp.retCodeDetails; } - private void doPreMutationHook(BatchOperationInProgress> batchOp) + private void doPreMutationHook(BatchOperationInProgress batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { for (int i = 0 ; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Mutation m = nextPair.getFirst(); + Mutation m = batchOp.operations[i]; if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { // pre hook says skip this Put @@ -1958,7 +1938,7 @@ public class HRegion implements HeapSize { // , Writable{ @SuppressWarnings("unchecked") private long doMiniBatchMutation( - BatchOperationInProgress> batchOp) throws IOException { + BatchOperationInProgress batchOp) throws IOException { // variable to note if all Put items are for the same CF -- metrics related boolean putsCfSetConsistent = true; @@ -1975,8 +1955,6 @@ public class HRegion implements HeapSize { // , Writable{ boolean walSyncSuccessful = false; boolean locked = false; - /** Keep track of the locks we hold so we can release them in finally clause */ - List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -1992,10 +1970,8 @@ public class HRegion implements HeapSize { // , Writable{ int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTimeMillis(); while (lastIndexExclusive < batchOp.operations.length) { - Pair nextPair = batchOp.operations[lastIndexExclusive]; - Mutation mutation = nextPair.getFirst(); + Mutation mutation = batchOp.operations[lastIndexExclusive]; boolean isPutMutation = mutation instanceof Put; - Integer providedLockId = nextPair.getSecond(); Map> familyMap = mutation.getFamilyMap(); // store the family map reference to allow for mutations @@ -2029,25 +2005,27 @@ public class HRegion implements HeapSize { // , Writable{ lastIndexExclusive++; continue; } + // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; - Integer acquiredLockId = null; - try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); - } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + boolean acquiredLock = false; + if (shouldBlock) { + try { + getRowLock(mutation.getRow()); + acquiredLock = true; + } catch (IOException ioe) { + LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + } + } else { + acquiredLock = tryRowLock(mutation.getRow()); } - if (acquiredLockId == null) { + if (!acquiredLock) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch } - if (providedLockId == null) { - acquiredLocks.add(acquiredLockId); - } + lastIndexExclusive++; numReadyToWrite++; @@ -2089,7 +2067,7 @@ public class HRegion implements HeapSize { // , Writable{ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; - Mutation mutation = batchOp.operations[i].getFirst(); + Mutation mutation = batchOp.operations[i]; if (mutation instanceof Put) { updateKVTimestamps(familyMaps[i].values(), byteNow); noOfPuts++; @@ -2110,8 +2088,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2145,7 +2123,7 @@ public class HRegion implements HeapSize { // , Writable{ } batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - Mutation m = batchOp.operations[i].getFirst(); + Mutation m = batchOp.operations[i]; if (!m.getWriteToWAL()) { if (m instanceof Put) { recordPutWithoutWal(m.getFamilyMap()); @@ -2166,9 +2144,9 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- - Mutation first = batchOp.operations[firstIndex].getFirst(); + Mutation mutation = batchOp.operations[firstIndex]; txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + walEdit, mutation.getClusterId(), now, this.htableDescriptor); // ------------------------------- // STEP 6. Release row locks, etc. @@ -2177,12 +2155,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (acquiredLocks != null) { - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); - } - acquiredLocks = null; - } + releaseMyRowLocks(); + // ------------------------- // STEP 7. Sync wal. // ------------------------- @@ -2192,8 +2166,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -2217,7 +2191,7 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.SUCCESS) { continue; } - Mutation m = batchOp.operations[i].getFirst(); + Mutation m = batchOp.operations[i]; if (m instanceof Put) { coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); } else { @@ -2240,11 +2214,7 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); } - if (acquiredLocks != null) { - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); - } - } + releaseMyRowLocks(); // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. @@ -2314,7 +2284,7 @@ public class HRegion implements HeapSize { // , Writable{ get.addColumn(family, qualifier); // Lock row - Integer lid = getLock(null, get.getRow(), true); + getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); List result = null; @@ -2360,27 +2330,23 @@ public class HRegion implements HeapSize { // , Writable{ if (matches) { // All edits for the given row (across all column families) must // happen atomically. - doBatchMutate((Mutation)w, lid); + doBatchMutate((Mutation)w); this.checkAndMutateChecksPassed.increment(); return true; } this.checkAndMutateChecksFailed.increment(); return false; } finally { - releaseRowLock(lid); + releaseMyRowLocks(); } } finally { closeRegionOperation(); } } - @SuppressWarnings("unchecked") - private void doBatchMutate(Mutation mutation, Integer lid) throws IOException, + private void doBatchMutate(Mutation mutation) throws IOException, org.apache.hadoop.hbase.exceptions.DoNotRetryIOException { - Pair[] mutateWithLocks = new Pair[] { - new Pair(mutation, lid) - }; - OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -2557,7 +2523,7 @@ public class HRegion implements HeapSize { // , Writable{ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - doBatchMutate(p, null); + doBatchMutate(p); } /** @@ -2608,7 +2574,7 @@ public class HRegion implements HeapSize { // , Writable{ * called when a Put/Delete has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(BatchOperationInProgress> batchOp, + private void rollbackMemstore(BatchOperationInProgress batchOp, Map>[] familyMaps, int start, int end) { int kvsRolledback = 0; @@ -3069,59 +3035,36 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Obtain a lock on the given row. Blocks until success. - * - * I know it's strange to have two mappings: - *
-   *   ROWS  ==> LOCKS
-   * 
- * as well as - *
-   *   LOCKS ==> ROWS
-   * 
- *

It would be more memory-efficient to just have one mapping; - * maybe we'll do that in the future. - * - * @param row Name of row to lock. - * @throws IOException - * @return The id of the held lock. - */ - public Integer obtainRowLock(final byte [] row) throws IOException { - startRegionOperation(); - this.writeRequestsCount.increment(); - try { - return internalObtainRowLock(row, true); - } finally { - closeRegionOperation(); - } - } - - /** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns - * null if unavailable. + * false if unavailable. + * @return true if the lock was obtained, false if waitForLock was false and the lock was not obtained + * @throws IOException if waitForLock was true and the lock could not be obtained after waiting */ - private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) + private boolean internalObtainRowLock(final byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); + RowLockContext rowLockContext = new RowLockContext(rowLatch, Thread.currentThread()); // loop until we acquire the row lock (unless !waitForLock) while (true) { - CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); - if (existingLatch == null) { + RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); + // If the row is already locked by this same thread we acquired it + if (existingContext == null + || existingContext.thread == Thread.currentThread()) { break; } else { - // row already locked + // row already locked by some other thread if (!waitForLock) { - return null; + return false; } try { - if (!existingLatch.await(this.rowLockWaitDuration, + if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + Bytes.toStringBinary(row)); @@ -3135,72 +3078,48 @@ public class HRegion implements HeapSize { // , Writable{ } } - // loop until we generate an unused lock id - while (true) { - Integer lockId = lockIdGenerator.incrementAndGet(); - HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); - if (existingRowKey == null) { - return lockId; - } else { - // lockId already in use, jump generator to a new spot - lockIdGenerator.set(rand.nextInt()); - } - } + locksHeldByThread.get().add(rowKey); + return true; } finally { closeRegionOperation(); } } /** - * Release the row lock! - * @param lockId The lock ID to release. + * Releases all row locks held by the current thread. */ - public void releaseRowLock(final Integer lockId) { - if (lockId == null) return; // null lock id, do nothing - HashedBytes rowKey = lockIds.remove(lockId); - if (rowKey == null) { - LOG.warn("Release unknown lockId: " + lockId); - return; - } - CountDownLatch rowLatch = lockedRows.remove(rowKey); - if (rowLatch == null) { - LOG.error("Releases row not locked, lockId: " + lockId + " row: " - + rowKey); - return; + public void releaseMyRowLocks() { + List locksHeld = locksHeldByThread.get(); + for (HashedBytes rowKey : locksHeld) { + RowLockContext rowLockContext = lockedRows.remove(rowKey); + if (rowLockContext == null) { + LOG.error("Internal row lcok state inconsistent, should not happen, row: " + rowKey); + continue; + } + rowLockContext.latch.countDown(); } - rowLatch.countDown(); + locksHeld.clear(); } /** - * See if row is currently locked. - * @param lockId - * @return boolean + * Acqures a lock on the given row. + * @throws IOException if the lock could not be obtained after waiting */ - boolean isRowLocked(final Integer lockId) { - return lockIds.containsKey(lockId); + public void getRowLock(byte [] row) throws IOException { + internalObtainRowLock(row, true); } - + /** - * Returns existing row lock if found, otherwise - * obtains a new row lock and returns it. - * @param lockid requested by the user, or null if the user didn't already hold lock - * @param row the row to lock - * @param waitForLock if true, will block until the lock is available, otherwise will - * simply return null if it could not acquire the lock. - * @return lockid or null if waitForLock is false and the lock was unavailable. + * Acquires lock on the given row if possible without waiting. + * @return true iff the lock was acquired */ - public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) - throws IOException { - Integer lid = null; - if (lockid == null) { - lid = internalObtainRowLock(row, waitForLock); - } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); - } - lid = lockid; + public boolean tryRowLock(byte [] row) { + try { + return internalObtainRowLock(row, false); + } catch (IOException e) { + LOG.error("Unexpected exception trying lock without wait", e); + return false; } - return lid; } /** @@ -4521,15 +4440,9 @@ public class HRegion implements HeapSize { // , Writable{ Collection rowsToLock = processor.getRowsToLock(); try { // 2. Acquire the row lock(s) - acquiredLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, fail if one lock times out - Integer lid = getLock(null, row, true); - if (lid == null) { - throw new IOException("Failed to acquire lock on " - + Bytes.toStringBinary(row)); - } - acquiredLocks.add(lid); + getRowLock(row); } // 3. Region lock lock(this.updatesLock.readLock(), acquiredLocks.size()); @@ -4567,12 +4480,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 9. Release row lock(s) - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - acquiredLocks = null; - } + releaseMyRowLocks(); // 10. Sync edit log if (txid != 0) { syncOrDefer(txid); @@ -4597,11 +4505,8 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - } + // release locks if some were acquired but another timed out + releaseMyRowLocks(); } @@ -4697,7 +4602,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); WriteEntry w = null; try { - Integer lid = getLock(null, row, true); + getRowLock(row); lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) @@ -4813,7 +4718,7 @@ public class HRegion implements HeapSize { // , Writable{ flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + releaseMyRowLocks(); } if (writeToWAL) { syncOrDefer(txid); // sync the transaction log outside the rowlock @@ -4864,7 +4769,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); WriteEntry w = null; try { - Integer lid = getLock(null, row, true); + getRowLock(row); lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) @@ -4956,7 +4861,7 @@ public class HRegion implements HeapSize { // , Writable{ flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + releaseMyRowLocks(); } if (writeToWAL) { syncOrDefer(txid); // sync the transaction log outside the rowlock @@ -5510,4 +5415,14 @@ public class HRegion implements HeapSize { // , Writable{ */ void failedBulkLoad(byte[] family, String srcPath) throws IOException; } + + static class RowLockContext { + private CountDownLatch latch; + private Thread thread; + + public RowLockContext(CountDownLatch latch, Thread thread) { + this.latch = latch; + this.thread = thread; + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7b7a953..4aa4034 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3751,8 +3751,7 @@ public class HRegionServer implements ClientProtocol, */ protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region, final List mutations, final CellScanner cells) { - @SuppressWarnings("unchecked") - Pair[] mutationsWithLocks = new Pair[mutations.size()]; + Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { @@ -3769,7 +3768,7 @@ public class HRegionServer implements ClientProtocol, mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } - mutationsWithLocks[i++] = new Pair(mutation, null); + mArray[i++] = mutation; builder.addResult(result); } @@ -3778,7 +3777,7 @@ public class HRegionServer implements ClientProtocol, cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.batchMutate(mutationsWithLocks); + OperationStatus codes[] = region.batchMutate(mArray); for (i = 0; i < codes.length; i++) { switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 02c97ad..0afe472 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -989,7 +989,7 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBatchMutate( - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { boolean bypass = false; ObserverContext ctx = null; for (RegionEnvironment env : coprocessors) { @@ -1014,7 +1014,7 @@ public class RegionCoprocessorHost * @throws IOException */ public void postBatchMutate( - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { ObserverContext ctx = null; for (RegionEnvironment env : coprocessors) { if (env.getInstance() instanceof RegionObserver) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index cd70387..b3f7034 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -406,7 +406,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void preBatchMutate(ObserverContext c, - MiniBatchOperationInProgress> miniBatchOp) throws IOException { + MiniBatchOperationInProgress miniBatchOp) throws IOException { RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); @@ -416,7 +416,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void postBatchMutate(final ObserverContext c, - final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + final MiniBatchOperationInProgress miniBatchOp) throws IOException { RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java index de79409..74918ab 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java @@ -19,7 +19,6 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -27,12 +26,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; - /** * Test of HBASE-7051; that checkAndPuts and puts behave atomically with respect to each other. * Rather than perform a bunch of trials to verify atomicity, this test recreates a race condition @@ -64,16 +60,12 @@ public class TestHBase7051 { final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(family)); - List> putsAndLocks = Lists.newArrayList(); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); puts[0] = put; - Pair pair = new Pair(puts[0], null); - - putsAndLocks.add(pair); - region.batchMutate(putsAndLocks.toArray(new Pair[0])); + region.batchMutate(puts); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); ctx.addThread(new PutThread(ctx, region)); @@ -101,15 +93,12 @@ public class TestHBase7051 { } public void doWork() throws Exception { - List> putsAndLocks = Lists.newArrayList(); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); puts[0] = put; - Pair pair = new Pair(puts[0], null); - putsAndLocks.add(pair); testStep = TestStep.PUT_STARTED; - region.batchMutate(putsAndLocks.toArray(new Pair[0])); + region.batchMutate(puts); } } @@ -143,16 +132,16 @@ public class TestHBase7051 { } @Override - public void releaseRowLock(Integer lockId) { + public void releaseMyRowLocks() { if (testStep == TestStep.INIT) { - super.releaseRowLock(lockId); + super.releaseMyRowLocks(); return; } if (testStep == TestStep.PUT_STARTED) { try { testStep = TestStep.PUT_COMPLETED; - super.releaseRowLock(lockId); + super.releaseMyRowLocks(); // put has been written to the memstore and the row lock has been released, but the // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of // operations would cause the non-atomicity to show up: @@ -170,16 +159,16 @@ public class TestHBase7051 { } } else if (testStep == TestStep.CHECKANDPUT_STARTED) { - super.releaseRowLock(lockId); + super.releaseMyRowLocks(); } } @Override - public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { + public void getRowLock(byte[] row) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return super.getLock(lockid, row, waitForLock); + super.getRowLock(row); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e1b23fb..88b0955 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -92,7 +91,6 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -100,8 +98,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import com.google.common.collect.Lists; - /** * Basic stand-alone testing of HRegion. * @@ -654,7 +650,6 @@ public class TestHRegion extends HBaseTestCase { } } - @SuppressWarnings("unchecked") public void testBatchPut() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); @@ -694,7 +689,7 @@ public class TestHRegion extends HBaseTestCase { metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); - Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); + region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); @@ -719,7 +714,7 @@ public class TestHRegion extends HBaseTestCase { } } LOG.info("...releasing row lock, which should let put thread continue"); - region.releaseRowLock(lockedRow); + region.releaseMyRowLocks(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); @@ -730,29 +725,6 @@ public class TestHRegion extends HBaseTestCase { OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } - LOG.info("Nexta, a batch put which uses an already-held lock"); - lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); - LOG.info("...obtained row lock"); - List> putsAndLocks = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - Pair pair = new Pair(puts[i], null); - if (i == 2) pair.setSecond(lockedRow); - putsAndLocks.add(pair); - } - - codes = region.batchMutate(putsAndLocks.toArray(new Pair[0])); - LOG.info("...performed put"); - for (int i = 0; i < 10; i++) { - assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : - OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); - } - // Make sure we didn't do an extra batch - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source); - - // Make sure we still hold lock - assertTrue(region.isRowLocked(lockedRow)); - LOG.info("...releasing lock"); - region.releaseRowLock(lockedRow); } finally { HRegion.closeHRegion(this.region); this.region = null;