diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 3bb0a77..72d71eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -108,7 +108,8 @@ public class HTable implements Table { private final Configuration configuration; private final ConnectionConfiguration connConfiguration; @VisibleForTesting - BufferedMutatorImpl mutator; + volatile BufferedMutatorImpl mutator; + private final Object mutatorLock = new Object(); private boolean closed = false; private final int scannerCaching; private final long scannerMaxResultSize; @@ -1333,14 +1334,14 @@ public class HTable implements Table { @VisibleForTesting BufferedMutator getBufferedMutator() throws IOException { if (mutator == null) { - this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( - new BufferedMutatorParams(tableName) - .pool(pool) - .writeBufferSize(writeBufferSize) - .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) - .opertationTimeout(operationTimeout) - .rpcTimeout(writeRpcTimeout) - ); + synchronized (mutatorLock) { + if (mutator == null) { + this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( + new BufferedMutatorParams(tableName).pool(pool).writeBufferSize(writeBufferSize) + .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) + .opertationTimeout(operationTimeout).rpcTimeout(writeRpcTimeout)); + } + } } return mutator; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index b863b40..6f9637f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -28,6 +28,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -749,6 +751,56 @@ public class TestFromClientSide3 { } + @Test + public void testPutThenGetWithMultipleThreads() throws Exception { + TableName TABLE = TableName.valueOf("testParallelPutAndGet"); + final int THREAD_NUM = 20; + final int ROUND_NUM = 10; + for (int round = 0; round < ROUND_NUM; round++) { + ArrayList threads = new ArrayList<>(THREAD_NUM); + final AtomicInteger successCnt = new AtomicInteger(0); + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + for (int i = 0; i < THREAD_NUM; i++) { + final int index = i; + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + final byte[] row = Bytes.toBytes("row-" + index); + final byte[] value = Bytes.toBytes("v" + index); + try { + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, value); + ht.put(put); + Get get = new Get(row); + Result result = ht.get(get); + byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); + if (Bytes.equals(value, returnedValue)) { + successCnt.getAndIncrement(); + } else { + LOG.error("Should be equal but not, original value: " + Bytes.toString(value) + + ", returned value: " + + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); + } + } catch (Throwable e) { + // do nothing + } + } + }); + threads.add(t); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); + ht.close(); + TEST_UTIL.deleteTable(TABLE); + } + } + private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException { HRegion region = (HRegion) find(tableName); assertEquals(0, region.getLockedRows().size());