diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07a10df..7d81ea9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -142,6 +142,7 @@ import com.google.common.collect.ClassToInstanceMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.MutableClassToInstanceMap; /** @@ -1876,7 +1877,7 @@ public class HRegion implements HeapSize { // , Writable{ try { byte [] row = delete.getRow(); // If we did not pass an existing row lock, obtain a new one - lid = getLock(lockid, row, true); + lid = getLock(lockid, new HashedBytes(row), true); try { // All edits for the given row (across all column families) must happen atomically. @@ -2074,7 +2075,7 @@ public class HRegion implements HeapSize { // , Writable{ // invokes a HRegion#abort. byte [] row = put.getRow(); // If we did not pass an existing row lock, obtain a new one - Integer lid = getLock(lockid, row, true); + Integer lid = getLock(lockid, new HashedBytes(row), true); try { // All edits for the given row (across all column families) must happen atomically. @@ -2241,6 +2242,8 @@ public class HRegion implements HeapSize { // , Writable{ /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + Set rowsAlreadyLocked = Sets.newHashSet(); + // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -2297,15 +2300,22 @@ public class HRegion implements HeapSize { // , Writable{ // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; + boolean failedToAcquire = false; Integer acquiredLockId = null; try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); + HashedBytes currentRow = new HashedBytes(mutation.getRow()); + if (!rowsAlreadyLocked.contains(currentRow)) { + acquiredLockId = getLock(providedLockId, currentRow, shouldBlock); + if (acquiredLockId == null) { + failedToAcquire = true; + } + rowsAlreadyLocked.add(currentRow); + } } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + failedToAcquire = true; } - if (acquiredLockId == null) { + if (failedToAcquire) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch @@ -2452,6 +2462,7 @@ public class HRegion implements HeapSize { // , Writable{ releaseRowLock(toRelease); } acquiredLocks = null; + rowsAlreadyLocked = null; } // ------------------------- // STEP 7. Sync wal. @@ -2608,7 +2619,7 @@ public class HRegion implements HeapSize { // , Writable{ get.addColumn(family, qualifier); // Lock row - Integer lid = getLock(lockId, get.getRow(), true); + Integer lid = getLock(lockId, new HashedBytes(get.getRow()), true); // wait for all previous transactions to complete (with lock held) mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); List result = new ArrayList(); @@ -3432,7 +3443,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get()); try { - return internalObtainRowLock(row, true); + return internalObtainRowLock(new HashedBytes(row), true); } finally { closeRegionOperation(); } @@ -3440,21 +3451,20 @@ public class HRegion implements HeapSize { // , Writable{ /** * Obtains or tries to obtain the given row lock. + * @param row * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns - * null if unavailable. */ - private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) + private Integer internalObtainRowLock(final HashedBytes row, boolean waitForLock) throws IOException { - checkRow(row, "row lock"); + checkRow(row.getBytes(), "row lock"); startRegionOperation(); try { - HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { - CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); + CountDownLatch existingLatch = lockedRows.putIfAbsent(row, rowLatch); if (existingLatch == null) { break; } else { @@ -3466,7 +3476,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" - + Bytes.toStringBinary(row)); + + Bytes.toStringBinary(row.getBytes())); } } catch (InterruptedException ie) { // Empty @@ -3477,7 +3487,7 @@ public class HRegion implements HeapSize { // , Writable{ // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); - HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); + HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, row); if (existingRowKey == null) { return lockId; } else { @@ -3532,20 +3542,23 @@ public class HRegion implements HeapSize { // , Writable{ /** * Returns existing row lock if found, otherwise * obtains a new row lock and returns it. + * * @param lockid requested by the user, or null if the user didn't already hold lock - * @param row the row to lock + * @param row the row to lock (hashed bytes) * @param waitForLock if true, will block until the lock is available, otherwise will * simply return null if it could not acquire the lock. * @return lockid or null if waitForLock is false and the lock was unavailable. */ - public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) + public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { - Integer lid = null; + Integer lid; if (lockid == null) { lid = internalObtainRowLock(row, waitForLock); } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); + HashedBytes lockedRow = lockIds.get(lockid); + if (!row.equals(lockedRow)) { + throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + + Bytes.toString(lockedRow.getBytes()) + " but wanted lock for row: " + Bytes.toString(row.getBytes())); } lid = lockid; } @@ -4874,7 +4887,7 @@ public class HRegion implements HeapSize { // , Writable{ acquiredLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // attempt to lock all involved rows, fail if one lock times out - Integer lid = getLock(null, row, true); + Integer lid = getLock(null, new HashedBytes(row), true); if (lid == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(row)); @@ -5060,7 +5073,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get()); try { - Integer lid = getLock(lockid, row, true); + Integer lid = getLock(lockid, new HashedBytes(row), true); lock(this.updatesLock.readLock()); try { long now = EnvironmentEdgeManager.currentTimeMillis(); @@ -5232,7 +5245,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get()); try { - Integer lid = getLock(lockid, row, true); + Integer lid = getLock(lockid, new HashedBytes(row), true); lock(this.updatesLock.readLock()); try { long now = EnvironmentEdgeManager.currentTimeMillis(); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestBatchHRegionLockingAndWrites.java src/test/java/org/apache/hadoop/hbase/regionserver/TestBatchHRegionLockingAndWrites.java new file mode 100644 index 0000000..f7a4a17 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestBatchHRegionLockingAndWrites.java @@ -0,0 +1,103 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +@Category(SmallTests.class) +public class TestBatchHRegionLockingAndWrites { + private static final String FAMILY = "a"; + private static int ACQUIRE_LOCK_COUNT = 0; + + @Test + @SuppressWarnings("unchecked") + public void testRedundantRowKeys() throws Exception { + + final int batchSize = 100000; + + String tableName = getClass().getSimpleName(); + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); + MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes("a")); + + List> someBatch = Lists.newArrayList(); + int i = 0; + while (i < batchSize) { + if (i % 2 == 0) { + someBatch.add(new Pair(new Put(Bytes.toBytes(0)), null)); + } else { + someBatch.add(new Pair(new Put(Bytes.toBytes(1)), null)); + } + i++; + } + long start = System.nanoTime(); + region.batchMutate(someBatch.toArray(new Pair[0])); + long duration = System.nanoTime() - start; + System.out.println("Batch mutate took: " + duration + "ns"); + assertEquals(2, ACQUIRE_LOCK_COUNT); + } + + @Test + public void testGettingTheLockMatchesMyRow() throws Exception { + MockHRegion region = getMockHRegion(); + HashedBytes rowKey = new HashedBytes(Bytes.toBytes(1)); + assertEquals(Integer.valueOf(2), region.getLock(null, rowKey, false)); + assertEquals(Integer.valueOf(2), region.getLock(2, rowKey, false)); + } + + private MockHRegion getMockHRegion() throws IOException { + String tableName = getClass().getSimpleName(); + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); + return (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(FAMILY)); + } + + private static class MockHRegion extends HRegion { + + public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, conf, regionInfo, htd, rsServices); + } + + @Override + public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { + ACQUIRE_LOCK_COUNT++; + return super.getLock(lockid, row, waitForLock); + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java index f5ce0b0..5636dd6 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -192,7 +193,7 @@ public class TestHBase7051 { } @Override - public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { + public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); }