diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index fcc9af7..b4d8ef3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -120,6 +120,16 @@ public interface BufferedMutator extends Closeable { long getWriteBufferSize(); /** + * @return The number of valid bytes in the write buffer. + */ + long getCurrentWriteBufferSize(); + + /** + * @return The number of {@link Mutation} in this buffer. + */ + int size(); + + /** * Set rpc timeout for this mutator instance */ void setRpcTimeout(int timeout); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index f7eb09d..82a5bff 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; @@ -57,65 +56,70 @@ import java.util.concurrent.atomic.AtomicLong; @InterfaceAudience.Private @InterfaceStability.Evolving public class BufferedMutatorImpl implements BufferedMutator { - private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); - private final ExceptionListener listener; - - protected ClusterConnection connection; // non-final so can be overridden in test private final TableName tableName; - private volatile Configuration conf; - - @VisibleForTesting - final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue(); - @VisibleForTesting - AtomicLong currentWriteBufferSize = new AtomicLong(0); - + private final Configuration conf; + private final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue<>(); + private final AtomicLong currentWriteBufferSize = new AtomicLong(0); /** * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. */ - @VisibleForTesting - AtomicInteger undealtMutationCount = new AtomicInteger(0); - private long writeBufferSize; + private final AtomicInteger undealtMutationCount = new AtomicInteger(0); + private volatile long maxBufferSize; private final int maxKeyValueSize; - private boolean closed = false; + private final boolean cleanupPoolOnClose; + private volatile boolean closed = false; private final ExecutorService pool; - private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor - private int operationTimeout; - - @VisibleForTesting - protected AsyncProcess ap; // non-final so can be overridden in test + private final AsyncProcess ap; BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + this(conn, params, craeteAsyncProcess(conn, rpcCallerFactory, rpcFactory, params.getPool())); + } + + BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { if (conn == null || conn.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } - this.tableName = params.getTableName(); - this.connection = conn; - this.conf = connection.getConfiguration(); - this.pool = params.getPool(); + this.conf = conn.getConfiguration(); + if (params.getPool() == null) { + this.pool = HTable.getDefaultExecutor(conf); + cleanupPoolOnClose = true; + } else { + this.pool = params.getPool(); + cleanupPoolOnClose = false; + } this.listener = params.getListener(); - ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); - this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? + this.maxBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.ap = ap; + } - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + private static AsyncProcess craeteAsyncProcess(ClusterConnection conn, + RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcFactory, + ExecutorService pool) { + int writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.operationTimeout = conn.getConfiguration().getInt( + int operationTimeout = conn.getConfiguration().getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, + return new AsyncProcess(conn, conn.getConfiguration(), pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout, operationTimeout); } + @VisibleForTesting + AsyncProcess getAsyncProcess() { + return ap; + } + @Override public TableName getName() { return tableName; @@ -166,7 +170,7 @@ public class BufferedMutatorImpl implements BufferedMutator { // Now try and queue what needs to be queued. while (undealtMutationCount.get() != 0 - && currentWriteBufferSize.get() > writeBufferSize) { + && currentWriteBufferSize.get() > maxBufferSize) { backgroundFlushCommits(false); } } @@ -185,22 +189,22 @@ public class BufferedMutatorImpl implements BufferedMutator { // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. backgroundFlushCommits(true); - this.pool.shutdown(); - boolean terminated; - int loopCnt = 0; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - loopCnt += 1; - if (loopCnt >= 10) { - LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); - break; - } - } while (!terminated); - + if (cleanupPoolOnClose) { + this.pool.shutdown(); + boolean terminated; + int loopCnt = 0; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + loopCnt += 1; + if (loopCnt >= 10) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + break; + } + } while (!terminated); + } } catch (InterruptedException e) { LOG.warn("waitForTermination interrupted"); - } finally { this.closed = true; } @@ -232,7 +236,7 @@ public class BufferedMutatorImpl implements BufferedMutator { if (!synchronous) { QueueRowAccess taker = new QueueRowAccess(); try { - ap.submit(tableName, taker, true, null, false); + ap.submit(pool, tableName, taker, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); @@ -245,7 +249,7 @@ public class BufferedMutatorImpl implements BufferedMutator { QueueRowAccess taker = new QueueRowAccess(); try { while (!taker.isEmpty()) { - ap.submit(tableName, taker, true, null, false); + ap.submit(pool, tableName, taker, true, null, false); taker.reset(); } } finally { @@ -267,12 +271,18 @@ public class BufferedMutatorImpl implements BufferedMutator { /** * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought * not be called for production uses. + * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()} + * will be called. + * @param writeBufferSize The max size of internal buffer where data is stored. + * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException + * if an I/O error occurs and there are too many retries. + * @throws java.io.InterruptedIOException if the I/O task is interrupted. * @deprecated Going away when we drop public support for {@link HTable}. */ @Deprecated public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, InterruptedIOException { - this.writeBufferSize = writeBufferSize; + this.maxBufferSize = writeBufferSize; if (currentWriteBufferSize.get() > writeBufferSize) { flush(); } @@ -283,19 +293,27 @@ public class BufferedMutatorImpl implements BufferedMutator { */ @Override public long getWriteBufferSize() { - return this.writeBufferSize; + return this.maxBufferSize; } @Override public void setRpcTimeout(int timeout) { - this.writeRpcTimeout = timeout; ap.setRpcTimeout(timeout); } @Override public void setOperationTimeout(int timeout) { - this.operationTimeout = timeout; - ap.setOperationTimeout(operationTimeout); + ap.setOperationTimeout(timeout); + } + + @Override + public long getCurrentWriteBufferSize() { + return currentWriteBufferSize.get(); + } + + @Override + public int size() { + return undealtMutationCount.get(); } private class QueueRowAccess implements RowAccess { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 1134923..0ba3724 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -293,9 +293,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (params.getTableName() == null) { throw new IllegalArgumentException("TableName cannot be null."); } - if (params.getPool() == null) { - params.pool(HTable.getDefaultExecutor(getConfiguration())); - } if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { params.writeBufferSize(connectionConfig.getWriteBufferSize()); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index b2c012d..07c7334 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -103,27 +103,29 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceStability.Stable public class HTable implements Table { private static final Log LOG = LogFactory.getLog(HTable.class); - protected ClusterConnection connection; + private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; + private final ClusterConnection connection; private final TableName tableName; - private volatile Configuration configuration; - private ConnectionConfiguration connConfiguration; - protected BufferedMutatorImpl mutator; + private final Configuration configuration; + private final ConnectionConfiguration connConfiguration; + @VisibleForTesting + BufferedMutatorImpl mutator; private boolean closed = false; - protected int scannerCaching; - protected long scannerMaxResultSize; - private ExecutorService pool; // For Multi & Scan + private final int scannerCaching; + private final long scannerMaxResultSize; + private final ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc private int readRpcTimeout; // timeout for each read rpc request private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() - private Consistency defaultConsistency = Consistency.STRONG; - private HRegionLocator locator; + private final HRegionLocator locator; /** The Async process for batch */ - protected AsyncProcess multiAp; - private RpcRetryingCallerFactory rpcCallerFactory; - private RpcControllerFactory rpcControllerFactory; + @VisibleForTesting + AsyncProcess multiAp; + private final RpcRetryingCallerFactory rpcCallerFactory; + private final RpcControllerFactory rpcControllerFactory; // Marked Private @since 1.0 @InterfaceAudience.Private @@ -167,19 +169,44 @@ public class HTable implements Table { this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.connConfiguration = tableConfig; - this.pool = pool; + if (tableConfig == null) { + connConfiguration = new ConnectionConfiguration(configuration); + } else { + connConfiguration = tableConfig; + } if (pool == null) { this.pool = getDefaultExecutor(this.configuration); this.cleanupPoolOnClose = true; } else { + this.pool = pool; this.cleanupPoolOnClose = false; } + if (rpcCallerFactory == null) { + this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); + } else { + this.rpcCallerFactory = rpcCallerFactory; + } + + if (rpcControllerFactory == null) { + this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); + } else { + this.rpcControllerFactory = rpcControllerFactory; + } - this.rpcCallerFactory = rpcCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - this.finishSetup(); + // puts need to track errors globally due to how the APIs currently work. + multiAp = this.connection.getAsyncProcess(); + this.locator = new HRegionLocator(tableName, connection); } /** @@ -187,20 +214,26 @@ public class HTable implements Table { * @throws IOException */ @VisibleForTesting - protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { + protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException { connection = conn; - tableName = params.getTableName(); - connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); + this.tableName = mutator.getName(); + this.configuration = connection.getConfiguration(); + connConfiguration = new ConnectionConfiguration(configuration); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; - // used from tests, don't trust the connection is real - this.mutator = new BufferedMutatorImpl(conn, null, null, params); + this.mutator = mutator; this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + this.rpcControllerFactory = null; + this.rpcCallerFactory = null; + this.pool = null; + this.locator = null; } /** @@ -211,36 +244,6 @@ public class HTable implements Table { } /** - * setup this HTable's parameter based on the passed configuration - */ - private void finishSetup() throws IOException { - if (connConfiguration == null) { - connConfiguration = new ConnectionConfiguration(configuration); - } - - this.operationTimeout = tableName.isSystemTable() ? - connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.scannerCaching = connConfiguration.getScannerCaching(); - this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - if (this.rpcCallerFactory == null) { - this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); - } - if (this.rpcControllerFactory == null) { - this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); - } - - // puts need to track errors globally due to how the APIs currently work. - multiAp = this.connection.getAsyncProcess(); - this.locator = new HRegionLocator(getName(), connection); - } - - /** * {@inheritDoc} */ @Override @@ -417,7 +420,7 @@ public class HTable implements Table { get = ReflectionUtils.newInstance(get.getClass(), get); get.setCheckExistenceOnly(checkExistenceOnly); if (get.getConsistency() == null){ - get.setConsistency(defaultConsistency); + get.setConsistency(DEFAULT_CONSISTENCY); } } diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 5a21699..b2f0b48 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -212,6 +212,14 @@ public class TestAsyncProcess { } @Override + public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, RowAccess rows, + boolean atLeastOne, Callback callback, boolean needResults) + throws InterruptedIOException { + // We use results in tests to check things, so override to always save them. + return super.submit(pool, DUMMY_TABLE, rows, atLeastOne, callback, true); + } + + @Override public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, CancellableRegionServerCallable callable, int curTimeout) { @@ -604,7 +612,6 @@ public class TestAsyncProcess { final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); // sn has two regions long putSizeSN = 0; @@ -630,11 +637,11 @@ public class TestAsyncProcess { + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); + try (HTable ht = new HTable(conn, mutator)) { + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); ht.put(puts); List reqs = ap.allReqs; @@ -1005,14 +1012,34 @@ public class TestAsyncProcess { @Test public void testHTablePutSuccess() throws Exception { - BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); - ht.ap = new MyAsyncProcess(createHConnection(), conf, true); + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE) + .writeBufferSize(0); + BufferedMutatorImpl ht = createBufferedMutatorImpl(conn, pm, ap); Put put = createPut(1, true); Assert.assertEquals(0, ht.getWriteBufferSize()); ht.mutate(put); - Assert.assertEquals(0, ht.getWriteBufferSize()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + } + + private BufferedMutatorImpl createBufferedMutatorImpl(ClusterConnection conn, + BufferedMutatorParams params, MyAsyncProcess ap) throws IOException { + return new BufferedMutatorImpl(conn, params, ap); + } + + @Test + public void testBufferedMutatorImplWithSharedPool() throws Exception { + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE); + pm.pool(ap.getPool(null)); + BufferedMutatorImpl ht = createBufferedMutatorImpl(conn, pm, ap); + + ht.close(); + assertFalse(ap.getPool(null).isShutdown()); } private void doHTableFailedPut(boolean bufferOn) throws Exception { @@ -1023,14 +1050,13 @@ public class TestAsyncProcess { } else { bufferParam.writeBufferSize(0L); } - - HTable ht = new HTable(conn, bufferParam); MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, bufferParam, ap); + HTable ht = new HTable(conn, mutator); Put put = createPut(1, false); - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); try { ht.put(put); if (bufferOn) { @@ -1039,7 +1065,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -1067,10 +1093,10 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { ClusterConnection conn = createHConnection(); - BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, - new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); + BufferedMutatorParams bp = new BufferedMutatorParams(DUMMY_TABLE) + .writeBufferSize(0); MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - mutator.ap = ap; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, bp , ap); Put p = createPut(1, false); mutator.mutate(p); @@ -1083,13 +1109,13 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, mutator.writeAsyncBuffer.size()); + Assert.assertEquals(0, mutator.size()); try { mutator.mutate(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); + Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); } @Test @@ -1303,7 +1329,10 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); - HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, pm, ap); + HTable ht = new HTable(conn, mutator); ht.multiAp = new MyAsyncProcess(conn, conf, false); List puts = new ArrayList(); @@ -1334,16 +1363,14 @@ public class TestAsyncProcess { public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(conf); ClusterConnection conn = new MyConnectionImpl(configuration); - BufferedMutatorImpl mutator = - new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); - configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); - MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); - mutator.ap = ap; - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); - mutator.ap.serverTrackerTimeout = 1; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); + configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); + + Assert.assertNotNull(ap.createServerErrorTracker()); + Assert.assertTrue(ap.serverTrackerTimeout > 200); + ap.serverTrackerTimeout = 1; Put p = createPut(1, false); mutator.mutate(p); @@ -1366,9 +1393,10 @@ public class TestAsyncProcess { copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); ClusterConnection conn = createHConnection(); Mockito.when(conn.getConfiguration()).thenReturn(copyConf); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, pm, ap); + try (HTable ht = new HTable(conn, mutator)) { ht.multiAp = ap; List gets = new LinkedList<>(); gets.add(new Get(DUMMY_BYTES_1)); @@ -1400,11 +1428,10 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); - mutator.ap = ap; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1422,12 +1449,9 @@ public class TestAsyncProcess { @Test public void testCallQueueTooLarge() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); - mutator.ap = ap; - - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1459,10 +1483,10 @@ public class TestAsyncProcess { } MyConnectionImpl2 con = new MyConnectionImpl2(hrls); - HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE)); MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(con, new BufferedMutatorParams(DUMMY_TABLE), ap); + HTable ht = new HTable(con, mutator); ht.multiAp = ap; - ht.batch(gets, null); Assert.assertEquals(ap.nbActions.get(), NB_REGS); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 53488ec..b278208 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -138,7 +138,7 @@ public class TestClientPushback { final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback() { + ((HTable) table).mutator.getAsyncProcess().submit(null, tableName, ops, true, new Batch.Callback() { @Override public void update(byte[] region, byte[] row, Result result) { endTime.set(EnvironmentEdgeManager.currentTime());