Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java (revision 1503895) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -192,7 +193,7 @@ } @Override - public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { + public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestBatchHRegionLockingAndWrites.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestBatchHRegionLockingAndWrites.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestBatchHRegionLockingAndWrites.java (revision 0) @@ -0,0 +1,107 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +@Category(SmallTests.class) +public class TestBatchHRegionLockingAndWrites { + private static final String FAMILY = "a"; + + @Test + @SuppressWarnings("unchecked") + public void testRedundantRowKeys() throws Exception { + + final int batchSize = 100000; + + String tableName = getClass().getSimpleName(); + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); + MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes("a")); + + List> someBatch = Lists.newArrayList(); + int i = 0; + while (i < batchSize) { + if (i % 2 == 0) { + someBatch.add(new Pair(new Put(Bytes.toBytes(0)), null)); + } else { + someBatch.add(new Pair(new Put(Bytes.toBytes(1)), null)); + } + i++; + } + long start = System.nanoTime(); + region.batchMutate(someBatch.toArray(new Pair[0])); + long duration = System.nanoTime() - start; + System.out.println("Batch mutate took: " + duration + "ns"); + assertEquals(2, region.getAcquiredLockCount()); + } + + @Test + public void testGettingTheLockMatchesMyRow() throws Exception { + MockHRegion region = getMockHRegion(); + HashedBytes rowKey = new HashedBytes(Bytes.toBytes(1)); + assertEquals(Integer.valueOf(2), region.getLock(null, rowKey, false)); + assertEquals(Integer.valueOf(2), region.getLock(2, rowKey, false)); + } + + private MockHRegion getMockHRegion() throws IOException { + String tableName = getClass().getSimpleName(); + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); + return (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(FAMILY)); + } + + private static class MockHRegion extends HRegion { + private int acqioredLockCount = 0; + + public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, conf, regionInfo, htd, rsServices); + } + + private int getAcquiredLockCount() { + return acqioredLockCount; + } + + @Override + public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { + acqioredLockCount++; + return super.getLock(lockid, row, waitForLock); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1503895) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -142,6 +142,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.MutableClassToInstanceMap; /** @@ -2241,6 +2242,8 @@ /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + Set rowsAlreadyLocked = Sets.newHashSet(); + // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -2297,15 +2300,23 @@ // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; + boolean failedToAcquire = false; Integer acquiredLockId = null; + HashedBytes currentRow = new HashedBytes(mutation.getRow()); try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); + if (providedLockId != null || !rowsAlreadyLocked.contains(currentRow)) { + acquiredLockId = getLock(providedLockId, currentRow, shouldBlock); + if (acquiredLockId == null) { + failedToAcquire = true; + } else if (providedLockId == null) { + rowsAlreadyLocked.add(currentRow); + } + } } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + LOG.warn("Failed getting lock in batch put, row=" + currentRow, ioe); + failedToAcquire = true; } - if (acquiredLockId == null) { + if (failedToAcquire) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch @@ -2452,6 +2463,7 @@ releaseRowLock(toRelease); } acquiredLocks = null; + rowsAlreadyLocked = null; } // ------------------------- // STEP 7. Sync wal. @@ -3432,7 +3444,7 @@ this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get()); try { - return internalObtainRowLock(row, true); + return internalObtainRowLock(new HashedBytes(row), true); } finally { closeRegionOperation(); } @@ -3444,12 +3456,11 @@ * Otherwise, just tries to obtain the lock and returns * null if unavailable. */ - private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) + private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock) throws IOException { - checkRow(row, "row lock"); + checkRow(rowKey.getBytes(), "row lock"); startRegionOperation(); try { - HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) @@ -3465,8 +3476,7 @@ try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out on getting lock for row=" - + Bytes.toStringBinary(row)); + throw new IOException("Timed out on getting lock for row=" + rowKey); } } catch (InterruptedException ie) { // Empty @@ -3540,12 +3550,27 @@ */ public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) throws IOException { - Integer lid = null; + return getLock(lockid, new HashedBytes(row), waitForLock); + } + + /** + * Returns existing row lock if found, otherwise + * obtains a new row lock and returns it. + * @param lockid requested by the user, or null if the user didn't already hold lock + * @param row the row to lock + * @param waitForLock if true, will block until the lock is available, otherwise will + * simply return null if it could not acquire the lock. + * @return lockid or null if waitForLock is false and the lock was unavailable. + */ + protected Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) + throws IOException { + Integer lid; if (lockid == null) { lid = internalObtainRowLock(row, waitForLock); } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); + HashedBytes rowFromLock = lockIds.get(lockid); + if (!row.equals(rowFromLock)) { + throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + rowFromLock + " but wanted lock for row: " + row); } lid = lockid; }