diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index fcc9af7..0070a9e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -120,6 +120,11 @@ public interface BufferedMutator extends Closeable { long getWriteBufferSize(); /** + * @return The number of valid bytes in the write buffer. + */ + long getCurrentWriteBufferSize(); + + /** * Set rpc timeout for this mutator instance */ void setRpcTimeout(int timeout); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index f7eb09d..f9c33f6 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -57,65 +57,75 @@ import java.util.concurrent.atomic.AtomicLong; @InterfaceAudience.Private @InterfaceStability.Evolving public class BufferedMutatorImpl implements BufferedMutator { - private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); - private final ExceptionListener listener; - - protected ClusterConnection connection; // non-final so can be overridden in test private final TableName tableName; - private volatile Configuration conf; - + private final Configuration conf; @VisibleForTesting - final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue<>(); @VisibleForTesting AtomicLong currentWriteBufferSize = new AtomicLong(0); - /** * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. */ @VisibleForTesting AtomicInteger undealtMutationCount = new AtomicInteger(0); - private long writeBufferSize; + private volatile long writeBufferSize; private final int maxKeyValueSize; - private boolean closed = false; + /** + * non-final so can be overridden in test. + */ + @VisibleForTesting + private final boolean cleanupPoolOnClose; + private volatile boolean closed = false; private final ExecutorService pool; - private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor - private int operationTimeout; @VisibleForTesting - protected AsyncProcess ap; // non-final so can be overridden in test + private final AsyncProcess ap; BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + this(conn, params, craeteAsyncProcess(conn, rpcCallerFactory, rpcFactory, params.getPool())); + } + + BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { if (conn == null || conn.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } - + assert params.getPool() != null : "The pool can not be null"; this.tableName = params.getTableName(); - this.connection = conn; - this.conf = connection.getConfiguration(); + this.conf = conn.getConfiguration(); this.pool = params.getPool(); this.listener = params.getListener(); - + this.cleanupPoolOnClose = params.getCleanupPoolOnClose(); ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.ap = ap; + } - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + private static AsyncProcess craeteAsyncProcess(ClusterConnection conn, + RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcFactory, + ExecutorService pool) { + int writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.operationTimeout = conn.getConfiguration().getInt( + int operationTimeout = conn.getConfiguration().getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, + return new AsyncProcess(conn, conn.getConfiguration(), pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout, operationTimeout); } + @VisibleForTesting + AsyncProcess getAsyncProcess() { + return ap; + } + @Override public TableName getName() { return tableName; @@ -185,22 +195,22 @@ public class BufferedMutatorImpl implements BufferedMutator { // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. backgroundFlushCommits(true); - this.pool.shutdown(); - boolean terminated; - int loopCnt = 0; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - loopCnt += 1; - if (loopCnt >= 10) { - LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); - break; - } - } while (!terminated); - + if (cleanupPoolOnClose) { + this.pool.shutdown(); + boolean terminated; + int loopCnt = 0; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + loopCnt += 1; + if (loopCnt >= 10) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + break; + } + } while (!terminated); + } } catch (InterruptedException e) { LOG.warn("waitForTermination interrupted"); - } finally { this.closed = true; } @@ -232,7 +242,7 @@ public class BufferedMutatorImpl implements BufferedMutator { if (!synchronous) { QueueRowAccess taker = new QueueRowAccess(); try { - ap.submit(tableName, taker, true, null, false); + ap.submit(pool, tableName, taker, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); @@ -245,7 +255,7 @@ public class BufferedMutatorImpl implements BufferedMutator { QueueRowAccess taker = new QueueRowAccess(); try { while (!taker.isEmpty()) { - ap.submit(tableName, taker, true, null, false); + ap.submit(pool, tableName, taker, true, null, false); taker.reset(); } } finally { @@ -267,6 +277,12 @@ public class BufferedMutatorImpl implements BufferedMutator { /** * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought * not be called for production uses. + * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()} + * will be called. + * @param writeBufferSize The max size of internal buffer where data is stored. + * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException + * if an I/O error occurs and there are too many retries. + * @throws java.io.InterruptedIOException if the I/O task is interrupted. * @deprecated Going away when we drop public support for {@link HTable}. */ @Deprecated @@ -288,14 +304,17 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public void setRpcTimeout(int timeout) { - this.writeRpcTimeout = timeout; ap.setRpcTimeout(timeout); } @Override public void setOperationTimeout(int timeout) { - this.operationTimeout = timeout; - ap.setOperationTimeout(operationTimeout); + ap.setOperationTimeout(timeout); + } + + @Override + public long getCurrentWriteBufferSize() { + return currentWriteBufferSize.get(); } private class QueueRowAccess implements RowAccess { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index d4cdead..3866628 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -38,13 +38,13 @@ public class BufferedMutatorParams { private long writeBufferSize = UNSET; private int maxKeyValueSize = UNSET; private ExecutorService pool = null; - private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { - @Override - public void onException(RetriesExhaustedWithDetailsException exception, - BufferedMutator bufferedMutator) - throws RetriesExhaustedWithDetailsException { - throw exception; - } + /** + * shutdown the pool in close(). + */ + private boolean cleanupPoolOnClose = true; + private BufferedMutator.ExceptionListener listener + = (RetriesExhaustedWithDetailsException exception, BufferedMutator bufferedMutator) -> { + throw exception; }; public BufferedMutatorParams(TableName tableName) { @@ -59,6 +59,15 @@ public class BufferedMutatorParams { return writeBufferSize; } + public BufferedMutatorParams cleanupPoolOnClose(boolean cleanupPoolOnClose) { + this.cleanupPoolOnClose = cleanupPoolOnClose; + return this; + } + + public boolean getCleanupPoolOnClose() { + return cleanupPoolOnClose; + } + /** * Override the write buffer size specified by the provided {@link Connection}'s * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index b2c012d..8a928e9 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -103,27 +103,29 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceStability.Stable public class HTable implements Table { private static final Log LOG = LogFactory.getLog(HTable.class); - protected ClusterConnection connection; + private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; + private final ClusterConnection connection; private final TableName tableName; - private volatile Configuration configuration; - private ConnectionConfiguration connConfiguration; - protected BufferedMutatorImpl mutator; + private final Configuration configuration; + private final ConnectionConfiguration connConfiguration; + @VisibleForTesting + BufferedMutatorImpl mutator; private boolean closed = false; - protected int scannerCaching; - protected long scannerMaxResultSize; - private ExecutorService pool; // For Multi & Scan + private final int scannerCaching; + private final long scannerMaxResultSize; + private final ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc private int readRpcTimeout; // timeout for each read rpc request private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() - private Consistency defaultConsistency = Consistency.STRONG; - private HRegionLocator locator; + private final HRegionLocator locator; /** The Async process for batch */ - protected AsyncProcess multiAp; - private RpcRetryingCallerFactory rpcCallerFactory; - private RpcControllerFactory rpcControllerFactory; + @VisibleForTesting + AsyncProcess multiAp; + private final RpcRetryingCallerFactory rpcCallerFactory; + private final RpcControllerFactory rpcControllerFactory; // Marked Private @since 1.0 @InterfaceAudience.Private @@ -167,19 +169,44 @@ public class HTable implements Table { this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.connConfiguration = tableConfig; - this.pool = pool; + if (tableConfig == null) { + connConfiguration = new ConnectionConfiguration(configuration); + } else { + connConfiguration = tableConfig; + } if (pool == null) { this.pool = getDefaultExecutor(this.configuration); this.cleanupPoolOnClose = true; } else { + this.pool = pool; this.cleanupPoolOnClose = false; } + if (rpcCallerFactory == null) { + this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); + } else { + this.rpcCallerFactory = rpcCallerFactory; + } - this.rpcCallerFactory = rpcCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; + if (rpcControllerFactory == null) { + this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); + } else { + this.rpcControllerFactory = rpcControllerFactory; + } - this.finishSetup(); + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + + // puts need to track errors globally due to how the APIs currently work. + multiAp = this.connection.getAsyncProcess(); + this.locator = new HRegionLocator(tableName, connection); } /** @@ -187,20 +214,27 @@ public class HTable implements Table { * @throws IOException */ @VisibleForTesting - protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { + protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException { connection = conn; - tableName = params.getTableName(); + this.tableName = mutator.getName(); connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; // used from tests, don't trust the connection is real - this.mutator = new BufferedMutatorImpl(conn, null, null, params); + this.mutator = mutator; this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + this.rpcControllerFactory = null; + this.rpcCallerFactory = null; + this.pool = null; + this.configuration = connection == null ? null : connection.getConfiguration(); + this.locator = null; } /** @@ -211,36 +245,6 @@ public class HTable implements Table { } /** - * setup this HTable's parameter based on the passed configuration - */ - private void finishSetup() throws IOException { - if (connConfiguration == null) { - connConfiguration = new ConnectionConfiguration(configuration); - } - - this.operationTimeout = tableName.isSystemTable() ? - connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.scannerCaching = connConfiguration.getScannerCaching(); - this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - if (this.rpcCallerFactory == null) { - this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); - } - if (this.rpcControllerFactory == null) { - this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); - } - - // puts need to track errors globally due to how the APIs currently work. - multiAp = this.connection.getAsyncProcess(); - this.locator = new HRegionLocator(getName(), connection); - } - - /** * {@inheritDoc} */ @Override @@ -417,7 +421,7 @@ public class HTable implements Table { get = ReflectionUtils.newInstance(get.getClass(), get); get.setCheckExistenceOnly(checkExistenceOnly); if (get.getConsistency() == null){ - get.setConsistency(defaultConsistency); + get.setConsistency(DEFAULT_CONSISTENCY); } } @@ -701,7 +705,7 @@ public class HTable implements Table { qualifier, amount, durability, getNonceGroup(), getNonce()); MutateResponse response = doMutate(request); Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + return Bytes.toLong(result.getValue(family, qualifier)); } }; return rpcCallerFactory. newCaller(this.writeRpcTimeout). @@ -724,7 +728,7 @@ public class HTable implements Table { getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, put); MutateResponse response = doMutate(request); - return Boolean.valueOf(response.getProcessed()); + return response.getProcessed(); } }; return rpcCallerFactory. newCaller(this.writeRpcTimeout). @@ -749,7 +753,7 @@ public class HTable implements Table { getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), compareType, put); MutateResponse response = doMutate(request); - return Boolean.valueOf(response.getProcessed()); + return response.getProcessed(); } }; return rpcCallerFactory. newCaller(this.writeRpcTimeout). diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 5a21699..dec6f34 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -212,6 +212,14 @@ public class TestAsyncProcess { } @Override + public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, RowAccess rows, + boolean atLeastOne, Callback callback, boolean needResults) + throws InterruptedIOException { + // We use results in tests to check things, so override to always save them. + return super.submit(pool, DUMMY_TABLE, rows, atLeastOne, callback, true); + } + + @Override public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, CancellableRegionServerCallable callable, int curTimeout) { @@ -604,7 +612,6 @@ public class TestAsyncProcess { final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); // sn has two regions long putSizeSN = 0; @@ -630,10 +637,10 @@ public class TestAsyncProcess { + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); + try (HTable ht = new HTable(conn, mutator)) { Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); ht.put(puts); List reqs = ap.allReqs; @@ -1005,14 +1012,34 @@ public class TestAsyncProcess { @Test public void testHTablePutSuccess() throws Exception { - BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); - ht.ap = new MyAsyncProcess(createHConnection(), conf, true); + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE) + .writeBufferSize(0); + BufferedMutatorImpl ht = createBufferedMutatorImpl(conn, pm, ap); Put put = createPut(1, true); Assert.assertEquals(0, ht.getWriteBufferSize()); ht.mutate(put); - Assert.assertEquals(0, ht.getWriteBufferSize()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + } + + private BufferedMutatorImpl createBufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, MyAsyncProcess ap) throws IOException { + params.pool(ap.getPool(null)); + return new BufferedMutatorImpl(conn, params, ap); + } + + @Test + public void testBufferedMutatorImplWithSharedPool() throws Exception { + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE); + pm.cleanupPoolOnClose(false); + BufferedMutatorImpl ht = createBufferedMutatorImpl(conn, pm, ap); + + ht.close(); + assertFalse(ap.getPool(null).isShutdown()); } private void doHTableFailedPut(boolean bufferOn) throws Exception { @@ -1023,10 +1050,9 @@ public class TestAsyncProcess { } else { bufferParam.writeBufferSize(0L); } - - HTable ht = new HTable(conn, bufferParam); MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, bufferParam, ap); + HTable ht = new HTable(conn, mutator); Put put = createPut(1, false); @@ -1067,10 +1093,10 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { ClusterConnection conn = createHConnection(); - BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, - new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); + BufferedMutatorParams bp = new BufferedMutatorParams(DUMMY_TABLE) + .writeBufferSize(0); MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - mutator.ap = ap; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, bp , ap); Put p = createPut(1, false); mutator.mutate(p); @@ -1303,7 +1329,10 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); - HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, pm, ap); + HTable ht = new HTable(conn, mutator); ht.multiAp = new MyAsyncProcess(conn, conf, false); List puts = new ArrayList(); @@ -1334,16 +1363,14 @@ public class TestAsyncProcess { public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(conf); ClusterConnection conn = new MyConnectionImpl(configuration); - BufferedMutatorImpl mutator = - new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); - configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); - MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); - mutator.ap = ap; - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); - mutator.ap.serverTrackerTimeout = 1; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); + configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); + + Assert.assertNotNull(ap.createServerErrorTracker()); + Assert.assertTrue(ap.serverTrackerTimeout > 200); + ap.serverTrackerTimeout = 1; Put p = createPut(1, false); mutator.mutate(p); @@ -1366,9 +1393,10 @@ public class TestAsyncProcess { copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); ClusterConnection conn = createHConnection(); Mockito.when(conn.getConfiguration()).thenReturn(copyConf); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + BufferedMutatorParams pm = new BufferedMutatorParams(DUMMY_TABLE); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, pm, ap); + try (HTable ht = new HTable(conn, mutator)) { ht.multiAp = ap; List gets = new LinkedList<>(); gets.add(new Get(DUMMY_BYTES_1)); @@ -1400,11 +1428,10 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); - mutator.ap = ap; + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1422,12 +1449,9 @@ public class TestAsyncProcess { @Test public void testCallQueueTooLarge() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); - mutator.ap = ap; - - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - + BufferedMutatorImpl mutator = createBufferedMutatorImpl(conn, new BufferedMutatorParams(DUMMY_TABLE), ap); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1459,10 +1483,10 @@ public class TestAsyncProcess { } MyConnectionImpl2 con = new MyConnectionImpl2(hrls); - HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE)); MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); + BufferedMutatorImpl mutator = createBufferedMutatorImpl(con, new BufferedMutatorParams(DUMMY_TABLE), ap); + HTable ht = new HTable(con, mutator); ht.multiAp = ap; - ht.batch(gets, null); Assert.assertEquals(ap.nbActions.get(), NB_REGS); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 53488ec..b278208 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -138,7 +138,7 @@ public class TestClientPushback { final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback() { + ((HTable) table).mutator.getAsyncProcess().submit(null, tableName, ops, true, new Batch.Callback() { @Override public void update(byte[] region, byte[] row, Result result) { endTime.set(EnvironmentEdgeManager.currentTime());