diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 50a2a11..be65a61 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -33,12 +33,12 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -95,9 +95,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*
*/
@InterfaceAudience.Private
-class AsyncProcess {
+public class AsyncProcess {
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
- protected static final AtomicLong COUNTER = new AtomicLong();
+ private static final AtomicLong COUNTER = new AtomicLong();
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
@@ -116,7 +116,7 @@ class AsyncProcess {
*/
public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
- protected final int thresholdToLogUndoneTaskDetails;
+ final int thresholdToLogUndoneTaskDetails;
private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
"hbase.client.threshold.log.details";
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
@@ -173,64 +173,66 @@ class AsyncProcess {
};
// TODO: many of the fields should be made private
- protected final long id;
+ final long id;
- protected final ClusterConnection connection;
- protected final RpcRetryingCallerFactory rpcCallerFactory;
- protected final RpcControllerFactory rpcFactory;
- protected final BatchErrors globalErrors;
- protected final ExecutorService pool;
+ final ClusterConnection connection;
+ private final RpcRetryingCallerFactory rpcCallerFactory;
+ final RpcControllerFactory rpcFactory;
+ final BatchErrors globalErrors;
- protected final AtomicLong tasksInProgress = new AtomicLong(0);
- protected final ConcurrentMap taskCounterPerRegion =
+ @VisibleForTesting
+ final AtomicLong tasksInProgress = new AtomicLong(0);
+ @VisibleForTesting
+ final ConcurrentMap taskCounterPerRegion =
new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR);
- protected final ConcurrentMap taskCounterPerServer =
+ @VisibleForTesting
+ final ConcurrentMap taskCounterPerServer =
new ConcurrentHashMap();
// Start configuration settings.
- protected final int startLogErrorsCnt;
+ final int startLogErrorsCnt;
/**
* The number of tasks simultaneously executed on the cluster.
*/
- protected final int maxTotalConcurrentTasks;
+ private final int maxTotalConcurrentTasks;
/**
* The max heap size of all tasks simultaneously executed on a server.
*/
- protected final long maxHeapSizePerRequest;
- protected final long maxHeapSizeSubmit;
+ private final long maxHeapSizePerRequest;
+ private final long maxHeapSizeSubmit;
/**
* The number of tasks we run in parallel on a single region.
* With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
* a set of operations on a region before the previous one is done. As well, this limits
* the pressure we put on the region server.
*/
- protected final int maxConcurrentTasksPerRegion;
+ @VisibleForTesting
+ final int maxConcurrentTasksPerRegion;
/**
* The number of task simultaneously executed on a single region server.
*/
- protected final int maxConcurrentTasksPerServer;
- protected final long pause;
- protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
- protected int numTries;
- protected int serverTrackerTimeout;
- protected int rpcTimeout;
- protected int operationTimeout;
- protected long primaryCallTimeoutMicroseconds;
+ @VisibleForTesting
+ final int maxConcurrentTasksPerServer;
+ final long pause;
+ final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
+ final int numTries;
+ @VisibleForTesting
+ int serverTrackerTimeout;
+ final long primaryCallTimeoutMicroseconds;
/** Whether to log details for batch errors */
- protected final boolean logBatchErrorDetails;
+ final boolean logBatchErrorDetails;
// End configuration settings.
- public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
+ public AsyncProcess(ClusterConnection hc, Configuration conf,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
- RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
+ RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
this.connection = hc;
- this.pool = pool;
this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
this.id = COUNTER.incrementAndGet();
@@ -249,8 +251,6 @@ class AsyncProcess {
// how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
- this.rpcTimeout = rpcTimeout;
- this.operationTimeout = operationTimeout;
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@@ -307,37 +307,27 @@ class AsyncProcess {
}
/**
- * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
- * RuntimeException
+ * The submitted task may be not accomplished at all if there are too many running tasks or
+ * other limits.
+ * @param The class to cast the result
+ * @param task The setting and data
+ * @return AsyncRequestFuture
+ * @throws java.io.InterruptedIOException
*/
- protected ExecutorService getPool(ExecutorService pool) {
- if (pool != null) {
- return pool;
+ public AsyncRequestFuture submit(AsyncProcessTask task) throws InterruptedIOException {
+ AsyncRequestFuture reqFuture = checkTask(task);
+ if (reqFuture != null) {
+ return reqFuture;
+ }
+ SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
+ switch (submittedRows) {
+ case ALL:
+ return submitAll(task);
+ case AT_LEAST_ONE:
+ return submit(task, true);
+ default:
+ return submit(task, false);
}
- if (this.pool != null) {
- return this.pool;
- }
- throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
- }
-
- /**
- * See #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean).
- * Uses default ExecutorService for this AP (must have been created with one).
- */
- public AsyncRequestFuture submit(TableName tableName,
- final RowAccess extends Row> rows, boolean atLeastOne, Batch.Callback callback,
- boolean needResults) throws InterruptedIOException {
- return submit(null, tableName, rows, atLeastOne, callback, needResults);
- }
- /**
- * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
- * Uses the {@link ListRowAccess} to wrap the {@link List}.
- */
- public AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
- List extends Row> rows, boolean atLeastOne, Batch.Callback callback,
- boolean needResults) throws InterruptedIOException {
- return submit(pool, tableName, new ListRowAccess(rows), atLeastOne,
- callback, needResults);
}
/**
@@ -351,14 +341,13 @@ class AsyncProcess {
* @param needResults Whether results are needed, or can be discarded.
* @param rows - the submitted row. Modified by the method: we remove the rows we took.
* @param atLeastOne true if we should submit at least a subset.
+ * @param rpcTimeout timeout for each rpc request
+ * @param operationTimeout timeout for each call
*/
- public AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
- RowAccess extends Row> rows, boolean atLeastOne, Batch.Callback callback,
- boolean needResults) throws InterruptedIOException {
- if (rows.isEmpty()) {
- return NO_REQS_RESULT;
- }
-
+ private AsyncRequestFuture submit(AsyncProcessTask task,
+ boolean atLeastOne) throws InterruptedIOException {
+ TableName tableName = task.getTableName();
+ RowAccess extends Row> rows = task.getRowAccess();
Map actionsByServer =
new HashMap();
List retainedActions = new ArrayList(rows.size());
@@ -426,8 +415,8 @@ class AsyncProcess {
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
- return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
- locationErrors, locationErrorRows, actionsByServer, pool);
+ return submitMultiActions(task, retainedActions, nonceGroup,
+ locationErrors, locationErrorRows, actionsByServer);
}
private RowCheckerHost createRowCheckerHost() {
@@ -442,13 +431,10 @@ class AsyncProcess {
, new SubmittedSizeChecker(maxHeapSizeSubmit)
));
}
- AsyncRequestFuture submitMultiActions(TableName tableName,
- List retainedActions, long nonceGroup, Batch.Callback callback,
- Object[] results, boolean needResults, List locationErrors,
- List locationErrorRows, Map actionsByServer,
- ExecutorService pool) {
- AsyncRequestFutureImpl ars = createAsyncRequestFuture(
- tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
+ AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
+ List retainedActions, long nonceGroup, List locationErrors,
+ List locationErrorRows, Map actionsByServer) {
+ AsyncRequestFutureImpl ars = createAsyncRequestFuture(task, retainedActions, nonceGroup);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
@@ -462,14 +448,6 @@ class AsyncProcess {
return ars;
}
- public void setRpcTimeout(int rpcTimeout) {
- this.rpcTimeout = rpcTimeout;
- }
-
- public void setOperationTimeout(int operationTimeout) {
- this.operationTimeout = operationTimeout;
- }
-
/**
* Helper that is used when grouping the actions per region server.
*
@@ -493,10 +471,6 @@ class AsyncProcess {
multiAction.add(regionName, action);
}
- public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List extends Row> rows, Batch.Callback callback, Object[] results) {
- return submitAll(pool, tableName, rows, callback, results, null, -1);
- }
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
* compatibility: it allows to be used with the batch interface that return an array of objects.
@@ -506,11 +480,11 @@ class AsyncProcess {
* @param rows the list of rows.
* @param callback the callback.
* @param results Optional array to return the results thru; backward compat.
- * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
+ * @param rpcTimeout timeout for each rpc request
+ * @param operationTimeout timeout for each call
*/
- public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List extends Row> rows, Batch.Callback callback, Object[] results,
- CancellableRegionServerCallable callable, int rpcTimeout) {
+ private AsyncRequestFuture submitAll(AsyncProcessTask task) {
+ RowAccess extends Row> rows = task.getRowAccess();
List actions = new ArrayList(rows.size());
// The position will be used by the processBatch to match the object array returned.
@@ -528,26 +502,45 @@ class AsyncProcess {
setNonce(ng, r, action);
actions.add(action);
}
- AsyncRequestFutureImpl ars = createAsyncRequestFuture(
- tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
- callable, rpcTimeout);
+ AsyncRequestFutureImpl ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup());
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
+ private AsyncRequestFuture checkTask(AsyncProcessTask task) {
+ if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
+ return NO_REQS_RESULT;
+ }
+ Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
+ checkOperationTimeout(task.getOperationTimeout());
+ checkRpcTimeout(task.getRpcTimeout());
+ return null;
+ }
+
private void setNonce(NonceGenerator ng, Row r, Action action) {
if (!(r instanceof Append) && !(r instanceof Increment)) return;
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
}
- protected AsyncRequestFutureImpl createAsyncRequestFuture(
- TableName tableName, List actions, long nonceGroup, ExecutorService pool,
- Batch.Callback callback, Object[] results, boolean needResults,
- CancellableRegionServerCallable callable, int rpcTimeout) {
- return new AsyncRequestFutureImpl(
- tableName, actions, nonceGroup, getPool(pool), needResults,
- results, callback, callable, operationTimeout,
- rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
+ private int checkTimeout(String name, int timeout) {
+ if (timeout < 0) {
+ throw new RuntimeException("The " + name + " must be bigger than zero,"
+ + "current value is" + timeout);
+ }
+ return timeout;
+ }
+ private int checkOperationTimeout(int operationTimeout) {
+ return checkTimeout("operation timeout", operationTimeout);
+ }
+
+ private int checkRpcTimeout(int rpcTimeout) {
+ return checkTimeout("rpc timeout", rpcTimeout);
+ }
+
+ @VisibleForTesting
+ AsyncRequestFutureImpl createAsyncRequestFuture(
+ AsyncProcessTask task, List actions, long nonceGroup) {
+ return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
}
/** Wait until the async does not have more than max tasks in progress. */
@@ -614,7 +607,7 @@ class AsyncProcess {
* {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created.
*/
public boolean hasError() {
- return globalErrors.hasErrors();
+ return globalErrors != null && globalErrors.hasErrors();
}
/**
@@ -630,7 +623,7 @@ class AsyncProcess {
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
List failedRows, String tableName) throws InterruptedIOException {
waitForMaximumCurrentTasks(0, tableName);
- if (!globalErrors.hasErrors()) {
+ if (globalErrors == null || !globalErrors.hasErrors()) {
return null;
}
if (failedRows != null) {
@@ -644,7 +637,7 @@ class AsyncProcess {
/**
* increment the tasks counters for a given set of regions. MT safe.
*/
- protected void incTaskCounters(Collection regions, ServerName sn) {
+ void incTaskCounters(Collection regions, ServerName sn) {
tasksInProgress.incrementAndGet();
computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
@@ -657,7 +650,7 @@ class AsyncProcess {
/**
* Decrements the counters for a given region and the region server. MT Safe.
*/
- protected void decTaskCounters(Collection regions, ServerName sn) {
+ void decTaskCounters(Collection regions, ServerName sn) {
for (byte[] regBytes : regions) {
AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
regionCnt.decrementAndGet();
@@ -676,7 +669,7 @@ class AsyncProcess {
@VisibleForTesting
protected RpcRetryingCaller createCaller(
CancellableRegionServerCallable callable, int rpcTimeout) {
- return rpcCallerFactory. newCaller(rpcTimeout);
+ return rpcCallerFactory. newCaller(checkRpcTimeout(rpcTimeout));
}
@@ -687,7 +680,7 @@ class AsyncProcess {
* We may benefit from connection-wide tracking of server errors.
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
*/
- protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
+ ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
return new ConnectionImplementation.ServerErrorTracker(
this.serverTrackerTimeout, this.numTries);
}
@@ -953,26 +946,4 @@ class AsyncProcess {
}
}
}
-
- public static class ListRowAccess implements RowAccess {
- private final List data;
- ListRowAccess(final List data) {
- this.data = data;
- }
-
- @Override
- public int size() {
- return data.size();
- }
-
- @Override
- public boolean isEmpty() {
- return data.isEmpty();
- }
-
- @Override
- public Iterator iterator() {
- return data.iterator();
- }
- }
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
new file mode 100644
index 0000000..eda1db2
--- /dev/null
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+
+/**
+ * Contains the attributes of a task which will be executed
+ * by {@link org.apache.hadoop.hbase.client.AsyncProcess}.
+ * The attributes will be validated by AsyncProcess.
+ * It's intended for advanced client applications.
+ * @param The type of response from server-side
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AsyncProcessTask {
+ /**
+ * The number of processed rows.
+ * The AsyncProcess has traffic control which may reject some rows.
+ */
+ public enum SubmittedRows {
+ ALL,
+ AT_LEAST_ONE,
+ NORMAL
+ }
+ public static Builder newBuilder(final Batch.Callback callback) {
+ return new Builder<>(callback);
+ }
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private ExecutorService pool;
+ private TableName tableName;
+ private RowAccess extends Row> rows;
+ private SubmittedRows submittedRows = SubmittedRows.ALL;
+ private Batch.Callback callback;
+ private boolean needResults;
+ private int rpcTimeout;
+ private int operationTimeout;
+ private CancellableRegionServerCallable callable;
+ private Object[] results;
+
+ private Builder() {
+ }
+
+ private Builder(Batch.Callback callback) {
+ this.callback = callback;
+ }
+
+ Builder setResults(Object[] results) {
+ this.results = results;
+ if (results != null && results.length != 0) {
+ setNeedResults(true);
+ }
+ return this;
+ }
+
+ public Builder setPool(ExecutorService pool) {
+ this.pool = pool;
+ return this;
+ }
+
+ public Builder setRpcTimeout(int rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public Builder setOperationTimeout(int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ return this;
+ }
+
+ public Builder setTableName(TableName tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder setRowAccess(List extends Row> rows) {
+ this.rows = new ListRowAccess<>(rows);
+ return this;
+ }
+
+ public Builder setRowAccess(RowAccess extends Row> rows) {
+ this.rows = rows;
+ return this;
+ }
+
+ public Builder setSubmittedRows(SubmittedRows submittedRows) {
+ this.submittedRows = submittedRows;
+ return this;
+ }
+
+ public Builder setNeedResults(boolean needResults) {
+ this.needResults = needResults;
+ return this;
+ }
+
+ Builder setCallable(CancellableRegionServerCallable callable) {
+ this.callable = callable;
+ return this;
+ }
+
+ public AsyncProcessTask build() {
+ return new AsyncProcessTask<>(pool, tableName, rows, submittedRows,
+ callback, callable, needResults, rpcTimeout, operationTimeout, results);
+ }
+ }
+ private final ExecutorService pool;
+ private final TableName tableName;
+ private final RowAccess extends Row> rows;
+ private final SubmittedRows submittedRows;
+ private final Batch.Callback callback;
+ private final CancellableRegionServerCallable callable;
+ private final boolean needResults;
+ private final int rpcTimeout;
+ private final int operationTimeout;
+ private final Object[] results;
+ AsyncProcessTask(AsyncProcessTask task) {
+ this(task.getPool(), task.getTableName(), task.getRowAccess(),
+ task.getSubmittedRows(), task.getCallback(), task.getCallable(),
+ task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(),
+ task.getResults());
+ }
+ AsyncProcessTask(ExecutorService pool, TableName tableName,
+ RowAccess extends Row> rows, SubmittedRows size, Batch.Callback callback,
+ CancellableRegionServerCallable callable, boolean needResults,
+ int rpcTimeout, int operationTimeout, Object[] results) {
+ this.pool = pool;
+ this.tableName = tableName;
+ this.rows = rows;
+ this.submittedRows = size;
+ this.callback = callback;
+ this.callable = callable;
+ this.needResults = needResults;
+ this.rpcTimeout = rpcTimeout;
+ this.operationTimeout = operationTimeout;
+ this.results = results;
+ }
+
+ public int getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public ExecutorService getPool() {
+ return pool;
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ public RowAccess extends Row> getRowAccess() {
+ return rows;
+ }
+
+ public SubmittedRows getSubmittedRows() {
+ return submittedRows;
+ }
+
+ public Batch.Callback getCallback() {
+ return callback;
+ }
+
+ CancellableRegionServerCallable getCallable() {
+ return callable;
+ }
+
+ Object[] getResults() {
+ return results;
+ }
+
+ public boolean getNeedResults() {
+ return needResults;
+ }
+
+ public int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ static class ListRowAccess implements RowAccess {
+
+ private final List data;
+
+ ListRowAccess(final List data) {
+ this.data = data;
+ }
+
+ @Override
+ public int size() {
+ return data.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return data.isEmpty();
+ }
+
+ @Override
+ public Iterator iterator() {
+ return data.iterator();
+ }
+ }
+}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index d176ce1..98e04d9 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -300,11 +300,11 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture {
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
- private CancellableRegionServerCallable currentCallable;
- private int operationTimeout;
- private int rpcTimeout;
+ private final CancellableRegionServerCallable currentCallable;
+ private final int operationTimeout;
+ private final int rpcTimeout;
private final Map> heapSizesByServer = new HashMap<>();
- protected AsyncProcess asyncProcess;
+ private final AsyncProcess asyncProcess;
/**
* For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
@@ -339,32 +339,27 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture {
}
}
-
-
- public AsyncRequestFutureImpl(TableName tableName, List actions, long nonceGroup,
- ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback,
- CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
- AsyncProcess asyncProcess) {
- this.pool = pool;
- this.callback = callback;
+ public AsyncRequestFutureImpl(AsyncProcessTask task, List actions,
+ long nonceGroup, AsyncProcess asyncProcess) {
+ this.pool = task.getPool();
+ this.callback = task.getCallback();
this.nonceGroup = nonceGroup;
- this.tableName = tableName;
+ this.tableName = task.getTableName();
this.actionsInProgress.set(actions.size());
- if (results != null) {
- assert needResults;
- if (results.length != actions.size()) {
+ if (task.getResults() == null) {
+ results = task.getNeedResults() ? new Object[actions.size()] : null;
+ } else {
+ if (task.getResults().length != actions.size()) {
throw new AssertionError("results.length");
}
- this.results = results;
+ this.results = task.getResults();
for (int i = 0; i != this.results.length; ++i) {
results[i] = null;
}
- } else {
- this.results = needResults ? new Object[actions.size()] : null;
}
List replicaGetIndices = null;
boolean hasAnyReplicaGets = false;
- if (needResults) {
+ if (results != null) {
// Check to see if any requests might require replica calls.
// We expect that many requests will consist of all or no multi-replica gets; in such
// cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
@@ -414,10 +409,10 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture {
this.errorsByServer = createServerErrorTracker();
this.errors = (asyncProcess.globalErrors != null)
? asyncProcess.globalErrors : new BatchErrors();
- this.operationTimeout = operationTimeout;
- this.rpcTimeout = rpcTimeout;
- this.currentCallable = callable;
- if (callable == null) {
+ this.operationTimeout = task.getOperationTimeout();
+ this.rpcTimeout = task.getRpcTimeout();
+ this.currentCallable = task.getCallable();
+ if (task.getCallable() == null) {
tracker = new RetryingTimeTracker().start();
}
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index f7eb09d..6735bf4 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Arrays;
@@ -57,63 +56,63 @@ import java.util.concurrent.atomic.AtomicLong;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BufferedMutatorImpl implements BufferedMutator {
-
private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
-
private final ExceptionListener listener;
-
- protected ClusterConnection connection; // non-final so can be overridden in test
private final TableName tableName;
- private volatile Configuration conf;
-
- @VisibleForTesting
- final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue();
- @VisibleForTesting
- AtomicLong currentWriteBufferSize = new AtomicLong(0);
-
+ private final Configuration conf;
+ private final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue<>();
+ private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
/**
* Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
* The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
*/
- @VisibleForTesting
- AtomicInteger undealtMutationCount = new AtomicInteger(0);
- private long writeBufferSize;
+ private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
+ private volatile long maxWriteBufferSize;
+ private final AtomicInteger rpcTimeout;
+ private final AtomicInteger operationTimeout;
private final int maxKeyValueSize;
- private boolean closed = false;
+ private final boolean cleanupPoolOnClose;
+ private volatile boolean closed = false;
private final ExecutorService pool;
- private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
- private int operationTimeout;
-
- @VisibleForTesting
- protected AsyncProcess ap; // non-final so can be overridden in test
+ private final AsyncProcess ap;
- BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
- RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+ BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params) {
if (conn == null || conn.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
-
this.tableName = params.getTableName();
- this.connection = conn;
- this.conf = connection.getConfiguration();
- this.pool = params.getPool();
+ this.conf = conn.getConfiguration();
+ if (params.getPool() == null) {
+ this.pool = HTable.getDefaultExecutor(conf);
+ cleanupPoolOnClose = true;
+ } else {
+ this.pool = params.getPool();
+ cleanupPoolOnClose = false;
+ }
this.listener = params.getListener();
-
- ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
- this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
- params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+ this.maxWriteBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
+ params.getWriteBufferSize() : conn.getConnectionConfiguration().getWriteBufferSize();
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
- params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
-
- this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.operationTimeout = conn.getConfiguration().getInt(
- HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- // puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
- writeRpcTimeout, operationTimeout);
+ params.getMaxKeyValueSize() : conn.getConnectionConfiguration().getMaxKeyValueSize();
+ this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ?
+ params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
+ this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ?
+ params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
+ if (params.getAsyncProcess() == null) {
+ this.ap = conn.createAsyncProcess();
+ } else {
+ this.ap = params.getAsyncProcess();
+ }
+ }
+
+ @VisibleForTesting
+ ExecutorService getPool() {
+ return pool;
+ }
+
+ @VisibleForTesting
+ AsyncProcess getAsyncProcess() {
+ return ap;
}
@Override
@@ -166,7 +165,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
// Now try and queue what needs to be queued.
while (undealtMutationCount.get() != 0
- && currentWriteBufferSize.get() > writeBufferSize) {
+ && currentWriteBufferSize.get() > maxWriteBufferSize) {
backgroundFlushCommits(false);
}
}
@@ -185,22 +184,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
- this.pool.shutdown();
- boolean terminated;
- int loopCnt = 0;
- do {
- // wait until the pool has terminated
- terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
- loopCnt += 1;
- if (loopCnt >= 10) {
- LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
- break;
- }
- } while (!terminated);
-
+ if (cleanupPoolOnClose) {
+ this.pool.shutdown();
+ boolean terminated;
+ int loopCnt = 0;
+ do {
+ // wait until the pool has terminated
+ terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+ loopCnt += 1;
+ if (loopCnt >= 10) {
+ LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
+ break;
+ }
+ } while (!terminated);
+ }
} catch (InterruptedException e) {
LOG.warn("waitForTermination interrupted");
-
} finally {
this.closed = true;
}
@@ -231,8 +230,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (!synchronous) {
QueueRowAccess taker = new QueueRowAccess();
+ AsyncProcessTask task = wrapAsyncProcessTask(taker);
try {
- ap.submit(tableName, taker, true, null, false);
+ ap.submit(task);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
@@ -243,9 +243,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
if (synchronous || ap.hasError()) {
QueueRowAccess taker = new QueueRowAccess();
+ AsyncProcessTask task = wrapAsyncProcessTask(taker);
try {
while (!taker.isEmpty()) {
- ap.submit(tableName, taker, true, null, false);
+ ap.submit(task);
taker.reset();
}
} finally {
@@ -265,14 +266,44 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
/**
+ * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
+ * @param taker access the inner buffer.
+ * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
+ */
+ private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(taker)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+ .build();
+ return new AsyncProcessTask(task) {
+ @Override
+ public int getRpcTimeout() {
+ return rpcTimeout.get();
+ }
+
+ @Override
+ public int getOperationTimeout() {
+ return operationTimeout.get();
+ }
+ };
+ }
+ /**
* This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
* not be called for production uses.
+ * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()}
+ * will be called.
+ * @param writeBufferSize The max size of internal buffer where data is stored.
+ * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException
+ * if an I/O error occurs and there are too many retries.
+ * @throws java.io.InterruptedIOException if the I/O task is interrupted.
* @deprecated Going away when we drop public support for {@link HTable}.
*/
@Deprecated
public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
InterruptedIOException {
- this.writeBufferSize = writeBufferSize;
+ this.maxWriteBufferSize = writeBufferSize;
if (currentWriteBufferSize.get() > writeBufferSize) {
flush();
}
@@ -283,19 +314,27 @@ public class BufferedMutatorImpl implements BufferedMutator {
*/
@Override
public long getWriteBufferSize() {
- return this.writeBufferSize;
+ return this.maxWriteBufferSize;
}
@Override
- public void setRpcTimeout(int timeout) {
- this.writeRpcTimeout = timeout;
- ap.setRpcTimeout(timeout);
+ public void setRpcTimeout(int rpcTimeout) {
+ this.rpcTimeout.set(rpcTimeout);
}
@Override
- public void setOperationTimeout(int timeout) {
- this.operationTimeout = timeout;
- ap.setOperationTimeout(operationTimeout);
+ public void setOperationTimeout(int operationTimeout) {
+ this.operationTimeout.set(operationTimeout);
+ }
+
+ @VisibleForTesting
+ long getCurrentWriteBufferSize() {
+ return currentWriteBufferSize.get();
+ }
+
+ @VisibleForTesting
+ int size() {
+ return undealtMutationCount.get();
}
private class QueueRowAccess implements RowAccess {
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index d4cdead..dd46f8f 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -37,14 +37,13 @@ public class BufferedMutatorParams {
private final TableName tableName;
private long writeBufferSize = UNSET;
private int maxKeyValueSize = UNSET;
+ private int rpcTimeout = UNSET;
+ private int operationTimeout = UNSET;
private ExecutorService pool = null;
- private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
- @Override
- public void onException(RetriesExhaustedWithDetailsException exception,
- BufferedMutator bufferedMutator)
- throws RetriesExhaustedWithDetailsException {
- throw exception;
- }
+ private AsyncProcess ap = null;
+ private BufferedMutator.ExceptionListener listener
+ = (RetriesExhaustedWithDetailsException exception, BufferedMutator bufferedMutator) -> {
+ throw exception;
};
public BufferedMutatorParams(TableName tableName) {
@@ -59,6 +58,33 @@ public class BufferedMutatorParams {
return writeBufferSize;
}
+ public BufferedMutatorParams asyncProcess(final AsyncProcess ap) {
+ this.ap = ap;
+ return this;
+ }
+
+ public AsyncProcess getAsyncProcess() {
+ return ap;
+ }
+
+ public BufferedMutatorParams rpcTimeout(final int rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ public BufferedMutatorParams opertationTimeout(final int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ return this;
+ }
+
+ public int getOperationTimeout() {
+ return operationTimeout;
+ }
+
/**
* Override the write buffer size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index b979c6a..b58d0e2 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -136,6 +136,15 @@ public interface Connection extends Abortable, Closeable {
BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException;
/**
+ * Retrieve a {@link AsyncProcess} for submitting the requests from client-side to server directly.
+ * The {@link AsyncProcess} returned by this method is thread-safe.
+ * This object can be used for long lived table operations.
+ *
+ * @return a {@link AsyncProcess}.
+ */
+ AsyncProcess createAsyncProcess();
+
+ /**
* Retrieve a RegionLocator implementation to inspect region information on a table. The returned
* RegionLocator is not thread-safe, so a new instance should be created for each using thread.
*
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 35bebae..41f5baf 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -42,7 +42,8 @@ public class ConnectionConfiguration {
private final int replicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
-
+ private final int readRpcTimeout;
+ private final int writeRpcTimeout;
// toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;
@@ -80,6 +81,12 @@ public class ConnectionConfiguration {
Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH);
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+
+ this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+ this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
}
/**
@@ -99,6 +106,16 @@ public class ConnectionConfiguration {
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+ this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ }
+
+ public int getReadRpcTimeout() {
+ return readRpcTimeout;
+ }
+
+ public int getWriteRpcTimeout() {
+ return writeRpcTimeout;
}
public long getWriteBufferSize() {
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index e75d9a5..10191a5 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -228,7 +228,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
- this.asyncProcess = createAsyncProcess(this.conf);
+ this.asyncProcess = createAsyncProcess(false);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
} else {
@@ -306,16 +306,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (params.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null.");
}
- if (params.getPool() == null) {
- params.pool(HTable.getDefaultExecutor(getConfiguration()));
- }
if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
params.writeBufferSize(connectionConfig.getWriteBufferSize());
}
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
- return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+ return new BufferedMutatorImpl(this, params);
}
@Override
@@ -1751,15 +1748,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(regionInfo);
}
- // For tests to override.
- protected AsyncProcess createAsyncProcess(Configuration conf) {
- // No default pool available.
- int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
- rpcTimeout, operationTimeout);
+ @Override
+ public AsyncProcess createAsyncProcess() {
+ return createAsyncProcess(true);
+ }
+
+ private AsyncProcess createAsyncProcess(boolean useGlobalErrors) {
+ return new AsyncProcess(this, conf, rpcCallerFactory, useGlobalErrors, rpcControllerFactory);
}
@Override
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index dd11abf..f79a55e 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -103,27 +103,28 @@ import org.apache.hadoop.hbase.util.Threads;
@InterfaceStability.Stable
public class HTable implements Table {
private static final Log LOG = LogFactory.getLog(HTable.class);
- protected ClusterConnection connection;
+ private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
+ private final ClusterConnection connection;
private final TableName tableName;
- private volatile Configuration configuration;
- private ConnectionConfiguration connConfiguration;
- protected BufferedMutatorImpl mutator;
+ private final Configuration configuration;
+ private final ConnectionConfiguration connConfiguration;
+ @VisibleForTesting
+ BufferedMutatorImpl mutator;
private boolean closed = false;
- protected int scannerCaching;
- protected long scannerMaxResultSize;
- private ExecutorService pool; // For Multi & Scan
+ private final int scannerCaching;
+ private final long scannerMaxResultSize;
+ private final ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc
private int readRpcTimeout; // timeout for each read rpc request
private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
- private final boolean cleanupConnectionOnClose; // close the connection in close()
- private Consistency defaultConsistency = Consistency.STRONG;
- private HRegionLocator locator;
+ private final HRegionLocator locator;
/** The Async process for batch */
- protected AsyncProcess multiAp;
- private RpcRetryingCallerFactory rpcCallerFactory;
- private RpcControllerFactory rpcControllerFactory;
+ @VisibleForTesting
+ AsyncProcess multiAp;
+ private final RpcRetryingCallerFactory rpcCallerFactory;
+ private final RpcControllerFactory rpcControllerFactory;
// Marked Private @since 1.0
@InterfaceAudience.Private
@@ -167,22 +168,42 @@ public class HTable implements Table {
throw new IllegalArgumentException("Given table name is null");
}
this.tableName = tableName;
- this.cleanupConnectionOnClose = false;
this.connection = connection;
this.configuration = connection.getConfiguration();
- this.connConfiguration = tableConfig;
- this.pool = pool;
+ if (tableConfig == null) {
+ connConfiguration = new ConnectionConfiguration(configuration);
+ } else {
+ connConfiguration = tableConfig;
+ }
if (pool == null) {
this.pool = getDefaultExecutor(this.configuration);
this.cleanupPoolOnClose = true;
} else {
+ this.pool = pool;
this.cleanupPoolOnClose = false;
}
+ if (rpcCallerFactory == null) {
+ this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
+ } else {
+ this.rpcCallerFactory = rpcCallerFactory;
+ }
- this.rpcCallerFactory = rpcCallerFactory;
- this.rpcControllerFactory = rpcControllerFactory;
+ if (rpcControllerFactory == null) {
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
+ } else {
+ this.rpcControllerFactory = rpcControllerFactory;
+ }
+
+ this.operationTimeout = tableName.isSystemTable() ?
+ connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+ this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+ this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+ this.scannerCaching = connConfiguration.getScannerCaching();
+ this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
- this.finishSetup();
+ // puts need to track errors globally due to how the APIs currently work.
+ multiAp = this.connection.getAsyncProcess();
+ this.locator = new HRegionLocator(tableName, connection);
}
/**
@@ -190,20 +211,23 @@ public class HTable implements Table {
* @throws IOException
*/
@VisibleForTesting
- protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
+ protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException {
connection = conn;
- tableName = params.getTableName();
- connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
+ this.tableName = mutator.getName();
+ this.configuration = connection.getConfiguration();
+ connConfiguration = new ConnectionConfiguration(configuration);
cleanupPoolOnClose = false;
- cleanupConnectionOnClose = false;
- // used from tests, don't trust the connection is real
- this.mutator = new BufferedMutatorImpl(conn, null, null, params);
- this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ this.mutator = mutator;
+ this.operationTimeout = tableName.isSystemTable() ?
+ connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+ this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+ this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+ this.scannerCaching = connConfiguration.getScannerCaching();
+ this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
+ this.rpcControllerFactory = null;
+ this.rpcCallerFactory = null;
+ this.pool = mutator.getPool();
+ this.locator = null;
}
/**
@@ -214,36 +238,6 @@ public class HTable implements Table {
}
/**
- * setup this HTable's parameter based on the passed configuration
- */
- private void finishSetup() throws IOException {
- if (connConfiguration == null) {
- connConfiguration = new ConnectionConfiguration(configuration);
- }
-
- this.operationTimeout = tableName.isSystemTable() ?
- connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
- this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
- configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.scannerCaching = connConfiguration.getScannerCaching();
- this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
- if (this.rpcCallerFactory == null) {
- this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
- }
- if (this.rpcControllerFactory == null) {
- this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
- }
-
- // puts need to track errors globally due to how the APIs currently work.
- multiAp = this.connection.getAsyncProcess();
- this.locator = new HRegionLocator(getName(), connection);
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -423,7 +417,7 @@ public class HTable implements Table {
get = ReflectionUtils.newInstance(get.getClass(), get);
get.setCheckExistenceOnly(checkExistenceOnly);
if (get.getConsistency() == null){
- get.setConsistency(defaultConsistency);
+ get.setConsistency(DEFAULT_CONSISTENCY);
}
}
@@ -483,13 +477,37 @@ public class HTable implements Table {
@Override
public void batch(final List extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
- batch(actions, results, -1);
+ int rpcTimeout = writeRpcTimeout;
+ boolean hasRead = false;
+ boolean hasWrite = false;
+ for (Row action : actions) {
+ if (action instanceof Mutation) {
+ hasWrite = true;
+ } else {
+ hasRead = true;
+ }
+ if (hasRead && hasWrite) {
+ break;
+ }
+ }
+ if (hasRead && !hasWrite) {
+ rpcTimeout = readRpcTimeout;
+ }
+ batch(actions, results, rpcTimeout);
}
public void batch(final List extends Row> actions, final Object[] results, int rpcTimeout)
throws InterruptedException, IOException {
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
- rpcTimeout);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(actions)
+ .setResults(results)
+ .setRpcTimeout(rpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -509,8 +527,20 @@ public class HTable implements Table {
public static void doBatchWithCallback(List extends Row> actions, Object[] results,
Callback callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
- pool, tableName, actions, callback, results);
+ int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
+ int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(actions)
+ .setResults(results)
+ .setOperationTimeout(operationTimeout)
+ .setRpcTimeout(writeTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -536,8 +566,16 @@ public class HTable implements Table {
}
};
List rows = Collections.singletonList(delete);
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
- null, null, callable, writeRpcTimeout);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setCallable(callable)
+ .setRpcTimeout(writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -615,8 +653,16 @@ public class HTable implements Table {
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
};
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
- null, null, callable, writeRpcTimeout);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rm.getMutations())
+ .setCallable(callable)
+ .setRpcTimeout(writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -795,8 +841,18 @@ public class HTable implements Table {
};
List rows = Collections.singletonList(delete);
Object[] results = new Object[1];
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
- null, results, callable, -1);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setCallable(callable)
+ // TODO any better timeout?
+ .setRpcTimeout(readRpcTimeout + writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .setResults(results)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -839,8 +895,18 @@ public class HTable implements Table {
* It is excessive to send such a large array, but that is required by the framework right now
* */
Object[] results = new Object[rm.getMutations().size()];
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
- null, results, callable, -1);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rm.getMutations())
+ .setResults(results)
+ .setCallable(callable)
+ // TODO any better timeout?
+ .setRpcTimeout(readRpcTimeout + writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -925,26 +991,27 @@ public class HTable implements Table {
if (this.closed) {
return;
}
- flushCommits();
- if (cleanupPoolOnClose) {
- this.pool.shutdown();
- try {
- boolean terminated = false;
- do {
- // wait until the pool has terminated
- terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
- } while (!terminated);
- } catch (InterruptedException e) {
- this.pool.shutdownNow();
- LOG.warn("waitForTermination interrupted");
+ try {
+ flushCommits();
+ if (mutator != null) {
+ mutator.close();
}
- }
- if (cleanupConnectionOnClose) {
- if (this.connection != null) {
- this.connection.close();
+ if (cleanupPoolOnClose) {
+ this.pool.shutdown();
+ try {
+ boolean terminated = false;
+ do {
+ // wait until the pool has terminated
+ terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+ } while (!terminated);
+ } catch (InterruptedException e) {
+ this.pool.shutdownNow();
+ LOG.warn("waitForTermination interrupted");
+ }
}
+ } finally {
+ this.closed = true;
}
- this.closed = true;
}
// validate for well-formedness
@@ -1102,7 +1169,6 @@ public class HTable implements Table {
if (mutator != null) {
mutator.setOperationTimeout(operationTimeout);
}
- multiAp.setOperationTimeout(operationTimeout);
}
@Override
@@ -1134,7 +1200,6 @@ public class HTable implements Table {
if (mutator != null) {
mutator.setRpcTimeout(writeRpcTimeout);
}
- multiAp.setRpcTimeout(writeRpcTimeout);
}
@Override
@@ -1217,37 +1282,41 @@ public class HTable implements Table {
Object[] results = new Object[execs.size()];
AsyncProcess asyncProcess =
- new AsyncProcess(connection, configuration, pool,
+ new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
- true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
- operationTimeout);
-
- AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
- new Callback() {
- @Override
- public void update(byte[] region, byte[] row,
- ClientProtos.CoprocessorServiceResult serviceResult) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
- ": region=" + Bytes.toStringBinary(region) +
- ", row=" + Bytes.toStringBinary(row) +
- ", value=" + serviceResult.getValue().getValue());
- }
- try {
- Message.Builder builder = responsePrototype.newBuilderForType();
- org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
- serviceResult.getValue().getValue().toByteArray());
- callback.update(region, row, (R) builder.build());
- } catch (IOException e) {
- LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
- e);
- callbackErrorExceptions.add(e);
- callbackErrorActions.add(execsByRow.get(row));
- callbackErrorServers.add("null");
- }
- }
- }, results);
-
+ true, RpcControllerFactory.instantiate(configuration));
+
+ Callback resultsCallback
+ = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
+ ": region=" + Bytes.toStringBinary(region) +
+ ", row=" + Bytes.toStringBinary(row) +
+ ", value=" + serviceResult.getValue().getValue());
+ }
+ try {
+ Message.Builder builder = responsePrototype.newBuilderForType();
+ org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
+ serviceResult.getValue().getValue().toByteArray());
+ callback.update(region, row, (R) builder.build());
+ } catch (IOException e) {
+ LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
+ e);
+ callbackErrorExceptions.add(e);
+ callbackErrorActions.add(execsByRow.get(row));
+ callbackErrorServers.add("null");
+ }
+ };
+ AsyncProcessTask task = AsyncProcessTask.newBuilder(resultsCallback)
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(execs)
+ .setResults(results)
+ .setRpcTimeout(readRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture future = asyncProcess.submit(task);
future.waitUntilDone();
if (future.hasError()) {
@@ -1270,10 +1339,10 @@ public class HTable implements Table {
.pool(pool)
.writeBufferSize(connConfiguration.getWriteBufferSize())
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
+ .opertationTimeout(operationTimeout)
+ .rpcTimeout(writeRpcTimeout)
);
}
- mutator.setRpcTimeout(writeRpcTimeout);
- mutator.setOperationTimeout(operationTimeout);
return mutator;
}
}
\ No newline at end of file
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 8ff64bf..c03b969 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -443,7 +443,7 @@ public class HTableMultiplexer {
private final AtomicInteger retryInQueue = new AtomicInteger(0);
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
private final int operationTimeout;
-
+ private final ExecutorService pool;
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
ExecutorService pool, ScheduledExecutorService executor) {
@@ -457,10 +457,10 @@ public class HTableMultiplexer {
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
- writeRpcTimeout, operationTimeout);
+ this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
+ this.pool = pool;
}
protected LinkedBlockingQueue getQueue() {
@@ -594,9 +594,14 @@ public class HTableMultiplexer {
Map actionsByServer =
Collections.singletonMap(server, actions);
try {
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setResults(results)
+ .setPool(pool)
+ .setRpcTimeout(writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .build();
AsyncRequestFuture arf =
- ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
- null, actionsByServer, null);
+ ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
arf.waitUntilDone();
if (arf.hasError()) {
// We just log and ignore the exception here since failed Puts will be resubmit again.
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
index 788f1a4..85fd590 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-@VisibleForTesting
-interface RowAccess extends Iterable {
+public interface RowAccess extends Iterable {
/**
* @return true if there are no elements.
*/
diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index bb6cbb5..cfd9692 100644
--- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -36,9 +36,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -59,15 +59,12 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost;
import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -80,8 +77,11 @@ import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -92,46 +92,50 @@ import static org.junit.Assert.fail;
public class TestAsyncProcess {
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
withLookingForStuckThread(true).build();
- private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
+ private static final Log LOG = LogFactory.getLog(TestAsyncProcess.class);
private static final TableName DUMMY_TABLE =
TableName.valueOf("DUMMY_TABLE");
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
private static final byte[] FAILS = "FAILS".getBytes();
- private static final Configuration conf = new Configuration();
-
- private static ServerName sn = ServerName.valueOf("s1:1,1");
- private static ServerName sn2 = ServerName.valueOf("s2:2,2");
- private static ServerName sn3 = ServerName.valueOf("s3:3,3");
- private static HRegionInfo hri1 =
+ private static final Configuration CONF = new Configuration();
+ private static final ConnectionConfiguration CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
+ private static final ServerName sn = ServerName.valueOf("s1:1,1");
+ private static final ServerName sn2 = ServerName.valueOf("s2:2,2");
+ private static final ServerName sn3 = ServerName.valueOf("s3:3,3");
+ private static final HRegionInfo hri1 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
- private static HRegionInfo hri2 =
+ private static final HRegionInfo hri2 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
- private static HRegionInfo hri3 =
+ private static final HRegionInfo hri3 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
- private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
- private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
- private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
+ private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
+ private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
+ private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
// Replica stuff
- private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
+ private static final HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
- private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
- private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
+ private static final HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
+ private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
- private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
+ private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
new HRegionLocation(hri2r1, sn3));
- private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
+ private static final RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
private static final String success = "success";
private static Exception failure = new Exception("failure");
- private static int NB_RETRIES = 3;
+ private static final int NB_RETRIES = 3;
+ private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@BeforeClass
public static void beforeClass(){
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
+ CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
}
static class CountingThreadFactory implements ThreadFactory {
@@ -153,20 +157,21 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger();
public List allReqs = new ArrayList();
public AtomicInteger callsCt = new AtomicInteger();
- private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+
private long previousTimeout = -1;
+ final ExecutorService service;
@Override
- protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName,
- List actions, long nonceGroup, ExecutorService pool,
- Batch.Callback callback, Object[] results, boolean needResults,
- CancellableRegionServerCallable callable, int curTimeout) {
+ protected AsyncRequestFutureImpl createAsyncRequestFuture(
+ AsyncProcessTask task, List actions, long nonceGroup) {
// Test HTable has tableName of null, so pass DUMMY_TABLE
+ AsyncProcessTask wrap = new AsyncProcessTask(task){
+ @Override
+ public TableName getTableName() {
+ return DUMMY_TABLE;
+ }
+ };
AsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl(
- DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
- results, callback, callable, operationTimeout, rpcTimeout, this);
+ wrap, actions, nonceGroup, this);
allReqs.add(r);
return r;
}
@@ -176,49 +181,54 @@ public class TestAsyncProcess {
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
- super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
- new SynchronousQueue(), new CountingThreadFactory(nbThreads)),
- new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout,
- operationTimeout);
+ super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+ service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
- super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
- new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())),
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
- rpcTimeout, operationTimeout);
+ super(hc, conf,
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+ service = Executors.newFixedThreadPool(5);
}
- public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
- @SuppressWarnings("unused") boolean dummy) {
- super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
- new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) {
- @Override
- public void execute(Runnable command) {
- throw new RejectedExecutionException("test under failure");
- }
- },
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
- rpcTimeout, operationTimeout);
+ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
+ List extends Row> rows, boolean atLeastOne, Batch.Callback callback,
+ boolean needResults) throws InterruptedIOException {
+ AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
+ .setPool(pool == null ? service : pool)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
+ .setNeedResults(needResults)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .build();
+ return submit(task);
+ }
+
+ public AsyncRequestFuture submit(TableName tableName,
+ final List extends Row> rows, boolean atLeastOne, Batch.Callback callback,
+ boolean needResults) throws InterruptedIOException {
+ return submit(null, tableName, rows, atLeastOne, callback, needResults);
}
@Override
- public AsyncRequestFuture submit(TableName tableName, RowAccess extends Row> rows,
- boolean atLeastOne, Callback callback, boolean needResults)
+ public AsyncRequestFuture submit(AsyncProcessTask task)
throws InterruptedIOException {
+ previousTimeout = task.getRpcTimeout();
// We use results in tests to check things, so override to always save them.
- return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
+ AsyncProcessTask wrap = new AsyncProcessTask(task) {
+ @Override
+ public boolean getNeedResults() {
+ return true;
+ }
+ };
+ return super.submit(wrap);
}
@Override
- public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List extends Row> rows, Batch.Callback callback, Object[] results,
- CancellableRegionServerCallable callable, int curTimeout) {
- previousTimeout = curTimeout;
- return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
- }
- @Override
protected RpcRetryingCaller createCaller(
CancellableRegionServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
@@ -260,12 +270,9 @@ public class TestAsyncProcess {
static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl {
- public MyAsyncRequestFutureImpl(TableName tableName, List actions, long nonceGroup,
- ExecutorService pool, boolean needResults, Object[] results,
- Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
- int rpcTimeout, AsyncProcess asyncProcess) {
- super(tableName, actions, nonceGroup, pool, needResults,
- results, callback, callable, operationTimeout, rpcTimeout, asyncProcess);
+ public MyAsyncRequestFutureImpl(AsyncProcessTask task, List actions,
+ long nonceGroup, AsyncProcess asyncProcess) {
+ super(task, actions, nonceGroup, asyncProcess);
}
@Override
@@ -483,7 +490,7 @@ public class TestAsyncProcess {
final boolean usedRegions[];
protected MyConnectionImpl2(List hrl) throws IOException {
- super(conf);
+ super(CONF);
this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()];
}
@@ -560,7 +567,7 @@ public class TestAsyncProcess {
AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
try {
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
fail("The maxHeapSizePerRequest must be bigger than zero");
} catch (IllegalArgumentException e) {
}
@@ -604,7 +611,6 @@ public class TestAsyncProcess {
final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
- BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
// sn has two regions
long putSizeSN = 0;
@@ -630,11 +636,12 @@ public class TestAsyncProcess {
+ ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
+ ", minCountSnRequest:" + minCountSnRequest
+ ", minCountSn2Request:" + minCountSn2Request);
- try (HTable ht = new HTable(conn, bufferParam)) {
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- ht.mutator.ap = ap;
- Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
+ try (HTable ht = new HTable(conn, mutator)) {
+ Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
ht.put(puts);
List reqs = ap.allReqs;
@@ -685,7 +692,7 @@ public class TestAsyncProcess {
@Test
public void testSubmit() throws Exception {
ClusterConnection hc = createHConnection();
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
List puts = new ArrayList();
puts.add(createPut(1, true));
@@ -704,7 +711,7 @@ public class TestAsyncProcess {
updateCalled.incrementAndGet();
}
};
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
List puts = new ArrayList();
puts.add(createPut(1, true));
@@ -718,7 +725,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegion() throws Exception {
ClusterConnection hc = createHConnection();
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
List puts = new ArrayList();
puts.add(createPut(1, true));
@@ -738,7 +745,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegionServer() throws Exception {
ClusterConnection hc = createHConnection();
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
@@ -758,7 +765,7 @@ public class TestAsyncProcess {
@Test
public void testFail() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
List puts = new ArrayList();
Put p = createPut(1, false);
@@ -784,7 +791,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitTrue() throws IOException {
- final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ final MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
ap.tasksInProgress.incrementAndGet();
final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion);
ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
@@ -823,7 +830,7 @@ public class TestAsyncProcess {
@Test
public void testFailAndSuccess() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
List puts = new ArrayList();
puts.add(createPut(1, false));
@@ -850,7 +857,7 @@ public class TestAsyncProcess {
@Test
public void testFlush() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
List puts = new ArrayList();
puts.add(createPut(1, false));
@@ -868,13 +875,13 @@ public class TestAsyncProcess {
@Test
public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
ClusterConnection hc = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false);
testTaskCount(ap);
}
@Test
public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
- Configuration copyConf = new Configuration(conf);
+ Configuration copyConf = new Configuration(CONF);
copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
ClusterConnection hc = createHConnection();
@@ -885,7 +892,7 @@ public class TestAsyncProcess {
testTaskCount(ap);
}
- private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException {
+ private void testTaskCount(MyAsyncProcess ap) throws InterruptedIOException, InterruptedException {
List puts = new ArrayList<>();
for (int i = 0; i != 3; ++i) {
puts.add(createPut(1, true));
@@ -907,7 +914,7 @@ public class TestAsyncProcess {
@Test
public void testMaxTask() throws Exception {
- final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ final MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
for (int i = 0; i < 1000; i++) {
ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
@@ -999,38 +1006,53 @@ public class TestAsyncProcess {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
- Mockito.when(hc.getConfiguration()).thenReturn(conf);
+ Mockito.when(hc.getConfiguration()).thenReturn(CONF);
+ Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
return hc;
}
@Test
public void testHTablePutSuccess() throws Exception {
- BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
- ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam);
Put put = createPut(1, true);
- Assert.assertEquals(0, ht.getWriteBufferSize());
+ Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), ht.getWriteBufferSize());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
ht.mutate(put);
- Assert.assertEquals(0, ht.getWriteBufferSize());
+ ht.flush();
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+ }
+
+ @Test
+ public void testBufferedMutatorImplWithSharedPool() throws Exception {
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam);
+
+ ht.close();
+ assertFalse(ap.service.isShutdown());
}
private void doHTableFailedPut(boolean bufferOn) throws Exception {
ClusterConnection conn = createHConnection();
- BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
if (bufferOn) {
bufferParam.writeBufferSize(1024L * 1024L);
} else {
bufferParam.writeBufferSize(0L);
}
-
- HTable ht = new HTable(conn, bufferParam);
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- ht.mutator.ap = ap;
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
+ HTable ht = new HTable(conn, mutator);
Put put = createPut(1, false);
- Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+ Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
try {
ht.put(put);
if (bufferOn) {
@@ -1039,7 +1061,7 @@ public class TestAsyncProcess {
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+ Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -1067,10 +1089,10 @@ public class TestAsyncProcess {
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
ClusterConnection conn = createHConnection();
- BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
- new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- mutator.ap = ap;
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
+ .writeBufferSize(0);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
Put p = createPut(1, false);
mutator.mutate(p);
@@ -1083,13 +1105,13 @@ public class TestAsyncProcess {
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
// manage what was inserted, what was tried but failed, and what was not even tried.
p = createPut(1, true);
- Assert.assertEquals(0, mutator.writeAsyncBuffer.size());
+ Assert.assertEquals(0, mutator.size());
try {
mutator.mutate(p);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size());
+ Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
}
@Test
@@ -1302,9 +1324,12 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
- ClusterConnection conn = new MyConnectionImpl(conf);
- HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
- ht.multiAp = new MyAsyncProcess(conn, conf, false);
+ ClusterConnection conn = new MyConnectionImpl(CONF);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
+ HTable ht = new HTable(conn, mutator);
+ ht.multiAp = new MyAsyncProcess(conn, CONF, false);
List puts = new ArrayList();
puts.add(createPut(1, true));
@@ -1332,18 +1357,16 @@ public class TestAsyncProcess {
}
@Test
public void testErrorsServers() throws IOException {
- Configuration configuration = new Configuration(conf);
+ Configuration configuration = new Configuration(CONF);
ClusterConnection conn = new MyConnectionImpl(configuration);
- BufferedMutatorImpl mutator =
- new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
- configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
-
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
- mutator.ap = ap;
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
+ configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
- Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
- mutator.ap.serverTrackerTimeout = 1;
+ Assert.assertNotNull(ap.createServerErrorTracker());
+ Assert.assertTrue(ap.serverTrackerTimeout > 200);
+ ap.serverTrackerTimeout = 1;
Put p = createPut(1, false);
mutator.mutate(p);
@@ -1361,14 +1384,15 @@ public class TestAsyncProcess {
public void testReadAndWriteTimeout() throws IOException {
final long readTimeout = 10 * 1000;
final long writeTimeout = 20 * 1000;
- Configuration copyConf = new Configuration(conf);
+ Configuration copyConf = new Configuration(CONF);
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
ClusterConnection conn = createHConnection();
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
- BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
- try (HTable ht = new HTable(conn, bufferParam)) {
- MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
+ try (HTable ht = new HTable(conn, mutator)) {
ht.multiAp = ap;
List gets = new LinkedList<>();
gets.add(new Get(DUMMY_BYTES_1));
@@ -1399,12 +1423,12 @@ public class TestAsyncProcess {
@Test
public void testGlobalErrors() throws IOException {
- ClusterConnection conn = new MyConnectionImpl(conf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
- AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
- mutator.ap = ap;
+ ClusterConnection conn = new MyConnectionImpl(CONF);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ Assert.assertNotNull(ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
@@ -1421,13 +1445,11 @@ public class TestAsyncProcess {
@Test
public void testCallQueueTooLarge() throws IOException {
- ClusterConnection conn = new MyConnectionImpl(conf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
- AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
- mutator.ap = ap;
-
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
-
+ ClusterConnection conn = new MyConnectionImpl(CONF);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
+ Assert.assertNotNull(ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
@@ -1459,10 +1481,11 @@ public class TestAsyncProcess {
}
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
- HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
- MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
+ MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam);
+ HTable ht = new HTable(con, mutator);
ht.multiAp = ap;
-
ht.batch(gets, null);
Assert.assertEquals(ap.nbActions.get(), NB_REGS);
@@ -1482,7 +1505,16 @@ public class TestAsyncProcess {
// One region has no replica, so the main call succeeds for it.
MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
- AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[3])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
@@ -1492,7 +1524,16 @@ public class TestAsyncProcess {
// Main call succeeds before replica calls are kicked off.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[3])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
@@ -1502,7 +1543,16 @@ public class TestAsyncProcess {
// Either main or replica can succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
long replicaCalls = ap.getReplicaCallCount();
Assert.assertTrue(replicaCalls >= 0);
@@ -1517,7 +1567,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
ap.setPrimaryCallDelay(sn2, 2000);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
Assert.assertEquals(1, ap.getReplicaCallCount());
}
@@ -1530,7 +1589,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
ap.addFailures(hri1, hri2);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
@@ -1542,7 +1610,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
ap.addFailures(hri1, hri1r2, hri2);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
@@ -1554,7 +1631,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
// We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
Assert.assertEquals(3, ars.getErrors().getNumExceptions());
@@ -1583,6 +1669,14 @@ public class TestAsyncProcess {
return ap;
}
+ private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) {
+ return new BufferedMutatorParams(name)
+ .asyncProcess(ap)
+ .pool(ap.service)
+ .rpcTimeout(RPC_TIMEOUT)
+ .opertationTimeout(OPERATION_TIMEOUT);
+ }
+
private static List makeTimelineGets(byte[]... rows) {
List result = new ArrayList();
for (byte[] row : rows) {
@@ -1663,14 +1757,9 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
- private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
- ExecutorService pool) {
- super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
- conf), rpcTimeout, operationTimeout);
+ public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
+ super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
+ conf));
}
}
@@ -1681,12 +1770,19 @@ public class TestAsyncProcess {
MyThreadPoolExecutor myPool =
new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue(200));
- AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool);
+ AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
List puts = new ArrayList();
puts.add(createPut(1, true));
-
- ap.submit(null, DUMMY_TABLE, puts, false, null, false);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(myPool)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(puts)
+ .setSubmittedRows(SubmittedRows.NORMAL)
+ .build();
+ ap.submit(task);
Assert.assertTrue(puts.isEmpty());
}
@@ -1695,7 +1791,7 @@ public class TestAsyncProcess {
final AtomicLong tasks = new AtomicLong(0);
final AtomicInteger max = new AtomicInteger(0);
final CyclicBarrier barrier = new CyclicBarrier(2);
- final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
+ final AsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
Runnable runnable = new Runnable() {
@Override
public void run() {
@@ -1738,18 +1834,18 @@ public class TestAsyncProcess {
*/
@Test
public void testRetryPauseWithCallQueueTooBigException() throws Exception {
- Configuration myConf = new Configuration(conf);
+ Configuration myConf = new Configuration(CONF);
final long specialPause = 500L;
final int retries = 1;
myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
ClusterConnection conn = new MyConnectionImpl(myConf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap =
new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
- mutator.ap = ap;
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam);
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
@@ -1775,8 +1871,9 @@ public class TestAsyncProcess {
final long normalPause =
myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
- mutator.ap = ap;
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ mutator = new BufferedMutatorImpl(conn, bufferParam);
+ Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
mutator.mutate(p);
startTime = System.currentTimeMillis();
try {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index ee89609..e5ab3e8 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -126,11 +126,8 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
- new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
- RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
- HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
+ new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
+ RpcControllerFactory.instantiate(conf)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 53488ec..2c5e89d 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.AsyncProcessTask;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
@@ -137,14 +138,20 @@ public class TestClientPushback {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong endTime = new AtomicLong();
long startTime = EnvironmentEdgeManager.currentTime();
-
- ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback() {
- @Override
- public void update(byte[] region, byte[] row, Result result) {
+ BufferedMutatorImpl mutator = ((HTable) table).mutator;
+ Batch.Callback callback = (byte[] r, byte[] row, Result result) -> {
endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown();
- }
- }, true);
+ };
+ AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
+ .setPool(mutator.getPool())
+ .setTableName(tableName)
+ .setRowAccess(ops)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+ .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
+ .setRpcTimeout(60 * 1000)
+ .build();
+ mutator.getAsyncProcess().submit(task);
// Currently the ExponentialClientBackoffPolicy under these test conditions
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 6d1e1f0..0f7f3d9 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -563,9 +563,17 @@ public class TestReplicasClient {
gets.add(g);
Object[] results = new Object[2];
- AsyncRequestFuture reqs = ap.submitAll(
- HTable.getDefaultExecutor(HTU.getConfiguration()),
- table.getName(), gets, null, results);
+ int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
+ int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
+ .setTableName(table.getName())
+ .setRowAccess(gets)
+ .setResults(results)
+ .setOperationTimeout(operationTimeout)
+ .setRpcTimeout(readTimeout)
+ .build();
+ AsyncRequestFuture reqs = ap.submit(task);
reqs.waitUntilDone();
// verify we got the right results back
for (Object r : results) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 5157868..09c7561 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -492,7 +492,6 @@ public class TestPerColumnFamilyFlush {
Thread.sleep(100);
}
}
- table.close();
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
index 68fffb1..380c252 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
@@ -171,22 +171,35 @@ public class TestTablePermissions {
}
}
+ /**
+ * The AccessControlLists.addUserPermission may throw exception before closing the table.
+ */
+ private void addUserPermission(Configuration conf, UserPermission userPerm, Table t) throws IOException {
+ try {
+ AccessControlLists.addUserPermission(conf, userPerm, t);
+ } finally {
+ t.close();
+ }
+ }
+
@Test
public void testBasicWrite() throws Exception {
Configuration conf = UTIL.getConfiguration();
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
// add some permissions
- AccessControlLists.addUserPermission(conf,
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null,
- UserPermission.Action.READ, UserPermission.Action.WRITE), table);
- AccessControlLists.addUserPermission(conf,
+ UserPermission.Action.READ, UserPermission.Action.WRITE),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null,
- UserPermission.Action.READ), table);
- AccessControlLists.addUserPermission(conf,
+ UserPermission.Action.READ),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("humphrey"),
TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
- UserPermission.Action.READ), table);
+ UserPermission.Action.READ),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
}
// retrieve the same
ListMultimap perms =
@@ -274,23 +287,22 @@ public class TestTablePermissions {
@Test
public void testPersistence() throws Exception {
Configuration conf = UTIL.getConfiguration();
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
- AccessControlLists.addUserPermission(conf,
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null,
- (byte[])null, TablePermission.Action.READ), table);
- AccessControlLists.addUserPermission(conf,
+ (byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null,
(byte[])null, TablePermission.Action.READ,
- TablePermission.Action.WRITE), table);
- AccessControlLists.addUserPermission(conf,
+ TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("clark"),
TEST_TABLE, TEST_FAMILY,
- TablePermission.Action.READ), table);
- AccessControlLists.addUserPermission(conf,
+ TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("dwight"),
TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
- TablePermission.Action.WRITE), table);
+ TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
}
// verify permissions survive changes in table metadata
ListMultimap preperms =
@@ -404,17 +416,17 @@ public class TestTablePermissions {
Configuration conf = UTIL.getConfiguration();
// add some permissions
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
- AccessControlLists.addUserPermission(conf,
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("user1"),
- Permission.Action.READ, Permission.Action.WRITE), table);
- AccessControlLists.addUserPermission(conf,
+ Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("user2"),
- Permission.Action.CREATE), table);
- AccessControlLists.addUserPermission(conf,
+ Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+ addUserPermission(conf,
new UserPermission(Bytes.toBytes("user3"),
- Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), table);
+ Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
}
ListMultimap perms = AccessControlLists.getTablePermissions(conf, null);
List user1Perms = perms.get("user1");
@@ -448,11 +460,11 @@ public class TestTablePermissions {
// currently running user is the system user and should have global admin perms
User currentUser = User.getCurrent();
assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN));
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
for (int i=1; i<=50; i++) {
- AccessControlLists.addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
- Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), table);
+ addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
+ Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE),
+ connection.getTable(AccessControlLists.ACL_TABLE_NAME));
// make sure the system user still shows as authorized
assertTrue("Failed current user auth check on iter "+i,
authManager.authorize(currentUser, Permission.Action.ADMIN));
diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
index 6ebf044..249eb28 100644
--- hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
+++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService
import scala.util.Random
import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
- Connection, BufferedMutatorParams, Admin}
+ Connection, BufferedMutatorParams, Admin, AsyncProcess}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.spark.Logging
@@ -49,6 +49,7 @@ class ConnectionMocker extends Connection {
def getTable(tableName: TableName, pool: ExecutorService): Table = null
def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
def getBufferedMutator (tableName: TableName): BufferedMutator = null
+ def createAsyncProcess: AsyncProcess = null
def getAdmin: Admin = null
def close(): Unit = {