diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index dd11abf..1cc81e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -107,7 +107,8 @@ public class HTable implements Table { private final TableName tableName; private volatile Configuration configuration; private ConnectionConfiguration connConfiguration; - protected BufferedMutatorImpl mutator; + protected volatile BufferedMutatorImpl mutator; + private Object mutatorLock = new Object(); private boolean closed = false; protected int scannerCaching; protected long scannerMaxResultSize; @@ -1265,12 +1266,14 @@ public class HTable implements Table { @VisibleForTesting BufferedMutator getBufferedMutator() throws IOException { if (mutator == null) { - this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( - new BufferedMutatorParams(tableName) - .pool(pool) - .writeBufferSize(connConfiguration.getWriteBufferSize()) - .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) - ); + synchronized (mutatorLock) { + if (mutator == null) { + this.mutator = (BufferedMutatorImpl) connection + .getBufferedMutator(new BufferedMutatorParams(tableName).pool(pool) + .writeBufferSize(connConfiguration.getWriteBufferSize()) + .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())); + } + } } mutator.setRpcTimeout(writeRpcTimeout); mutator.setOperationTimeout(operationTimeout); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index ae93e67..074ea78 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -43,6 +43,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -6219,4 +6220,54 @@ public class TestFromClientSide { .getNumberOfCachedRegionLocations(htd.getTableName()); assertEquals(results.size(), number); } + + @Test + public void testPutThenGetWithMultipleThreads() throws Exception { + TableName TABLE = TableName.valueOf("testParallelPutAndGet"); + final int THREAD_NUM = 20; + final int ROUND_NUM = 10; + for (int round = 0; round < ROUND_NUM; round++) { + ArrayList threads = new ArrayList<>(THREAD_NUM); + final AtomicInteger successCnt = new AtomicInteger(0); + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + for (int i = 0; i < THREAD_NUM; i++) { + final int index = i; + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + final byte[] row = Bytes.toBytes("row-" + index); + final byte[] value = Bytes.toBytes("v" + index); + try { + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, value); + ht.put(put); + Get get = new Get(row); + Result result = ht.get(get); + byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); + if (Bytes.equals(value, returnedValue)) { + successCnt.getAndIncrement(); + } else { + LOG.error("Should be equal but not, original value: " + Bytes.toString(value) + + ", returned value: " + + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); + } + } catch (Throwable e) { + // do nothing + } + } + }); + threads.add(t); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); + ht.close(); + TEST_UTIL.deleteTable(TABLE); + } + } }