diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConcurrentCAPWithLocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConcurrentCAPWithLocks.java new file mode 100644 index 0000000..65b1afa --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConcurrentCAPWithLocks.java @@ -0,0 +1,119 @@ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestConcurrentCAPWithLocks { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL + = new HBaseTestingUtility(); + private static byte[] TABLE = Bytes.toBytes("testConcurrentCAP"); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[] VALUE = Bytes.toBytes("testValue"); + + private static Random random = new Random(); + private static int SLAVES = 3; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + TEST_UTIL.createTable(TABLE, FAMILY); + } + + @Test + public void testConcurrentCAPWithLocks() throws Exception { + HTablePool hTablePool = new HTablePool(TEST_UTIL.getConfiguration(), 10); + + List locks = new ArrayList(); + ExecutorService executor = Executors.newFixedThreadPool(10); + List>> successes = new ArrayList>>(); + + HTableInterface hTable = hTablePool.getTable(TABLE); + for(int i = 0; i< 20; i++) { + locks.add(hTable.lockRow(("row"+i).getBytes())); + List> localSuccesses = new ArrayList>(); + for(int j = 0; j < 300; j++) { + localSuccesses.add(executor.submit(new StressLock(locks.get(i), "row"+i, hTablePool))); + } + successes.add(localSuccesses); + } + hTable.close(); + + executor.awaitTermination(5, TimeUnit.SECONDS); + + hTable = hTablePool.getTable(TABLE); + for(RowLock lock : locks) { + hTable.unlockRow(lock); + } + + List counts = new ArrayList(); + for(List> s : successes) { + int count = 0; + for(Future f : s) if(f.get()) count++; + counts.add(count); + } + + System.out.println(counts); + for(int c : counts) Assert.assertEquals(c, 1); + } + + class StressLock implements Callable { + + RowLock lock; + String row; + HTablePool hTablePool; + + public StressLock(RowLock lock, String row, HTablePool hTablePool) { + this.lock = lock; + this.row = row; + this.hTablePool = hTablePool; + } + + public Boolean call() throws Exception { + boolean success; + HTableInterface table = hTablePool.getTable(TABLE); + Put put = new Put(Bytes.toBytes(row), lock); + put.add(FAMILY, QUALIFIER, VALUE); + success = table.checkAndPut(Bytes.toBytes(row), FAMILY, QUALIFIER, new byte[]{}, put); + table.close(); + return success; + } + } +}