diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07a10df..0f5cc9d 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -142,6 +142,7 @@ import com.google.common.collect.ClassToInstanceMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.MutableClassToInstanceMap; /** @@ -2241,6 +2242,8 @@ public class HRegion implements HeapSize { // , Writable{ /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + Set rowsAlreadyLocked = Sets.newHashSet(); + // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -2297,15 +2300,22 @@ public class HRegion implements HeapSize { // , Writable{ // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; + boolean failedToAcquire = false; Integer acquiredLockId = null; try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); + HashedBytes currentRow = new HashedBytes(mutation.getRow()); + if (!rowsAlreadyLocked.contains(currentRow)) { + acquiredLockId = getLock(providedLockId, mutation.getRow(), shouldBlock); + if (acquiredLockId == null) { + failedToAcquire = true; + } + rowsAlreadyLocked.add(currentRow); + } } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + failedToAcquire = true; } - if (acquiredLockId == null) { + if (failedToAcquire) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch @@ -2452,6 +2462,7 @@ public class HRegion implements HeapSize { // , Writable{ releaseRowLock(toRelease); } acquiredLocks = null; + rowsAlreadyLocked = null; } // ------------------------- // STEP 7. Sync wal. @@ -3540,12 +3551,13 @@ public class HRegion implements HeapSize { // , Writable{ */ public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) throws IOException { - Integer lid = null; + Integer lid; if (lockid == null) { lid = internalObtainRowLock(row, waitForLock); } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); + byte[] rowFromLock = getRowFromLock(lockid); + if (!Bytes.equals(rowFromLock, row)) { + throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + Bytes.toString(rowFromLock) + " but wanted lock for row: " + Bytes.toString(row)); } lid = lockid; } diff --git src/test/java/org/apache/hadoop/hbase/TestBatchHRegionLockingAndWrites.java src/test/java/org/apache/hadoop/hbase/TestBatchHRegionLockingAndWrites.java new file mode 100644 index 0000000..b15b50c --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/TestBatchHRegionLockingAndWrites.java @@ -0,0 +1,97 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +@Category(SmallTests.class) +public class TestBatchHRegionLockingAndWrites { + private static final String FAMILY = "a"; + private static int ACQUIRE_LOCK_COUNT = 0; + + @Test + public void testRedundantRowKeys() throws Exception { + + final int batchSize = 100000; + + String tableName = getClass().getSimpleName(); + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); + MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes("a")); + + List> someBatch = Lists.newArrayList(); + int i = 0; + while (i < batchSize) { + if (i % 2 == 0) { + someBatch.add(new Pair(new Put(Bytes.toBytes(0)), null)); + } else { + someBatch.add(new Pair(new Put(Bytes.toBytes(1)), null)); + } + i++; + } + assertEquals(2, ACQUIRE_LOCK_COUNT); + } + + @Test + public void testGettingTheLockMatchesMyRow() throws Exception { + MockHRegion region = getMockHRegion(); + byte[] rowKey = Bytes.toBytes(1); + assertEquals(Integer.valueOf(2), region.getLock(null, rowKey, false)); + assertEquals(Integer.valueOf(2), region.getLock(2, rowKey, false)); + } + + private MockHRegion getMockHRegion() throws IOException { + String tableName = getClass().getSimpleName(); + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); + return (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(FAMILY)); + } + + private static class MockHRegion extends HRegion { + + public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, conf, regionInfo, htd, rsServices); + } + + @Override + public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { + ACQUIRE_LOCK_COUNT++; + return super.getLock(lockid, row, waitForLock); + } + } +} \ No newline at end of file