diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 50a2a11..be65a61 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -33,12 +33,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; -import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -95,9 +95,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; *

*/ @InterfaceAudience.Private -class AsyncProcess { +public class AsyncProcess { private static final Log LOG = LogFactory.getLog(AsyncProcess.class); - protected static final AtomicLong COUNTER = new AtomicLong(); + private static final AtomicLong COUNTER = new AtomicLong(); public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; @@ -116,7 +116,7 @@ class AsyncProcess { */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; - protected final int thresholdToLogUndoneTaskDetails; + final int thresholdToLogUndoneTaskDetails; private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = "hbase.client.threshold.log.details"; private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; @@ -173,64 +173,66 @@ class AsyncProcess { }; // TODO: many of the fields should be made private - protected final long id; + final long id; - protected final ClusterConnection connection; - protected final RpcRetryingCallerFactory rpcCallerFactory; - protected final RpcControllerFactory rpcFactory; - protected final BatchErrors globalErrors; - protected final ExecutorService pool; + final ClusterConnection connection; + private final RpcRetryingCallerFactory rpcCallerFactory; + final RpcControllerFactory rpcFactory; + final BatchErrors globalErrors; - protected final AtomicLong tasksInProgress = new AtomicLong(0); - protected final ConcurrentMap taskCounterPerRegion = + @VisibleForTesting + final AtomicLong tasksInProgress = new AtomicLong(0); + @VisibleForTesting + final ConcurrentMap taskCounterPerRegion = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - protected final ConcurrentMap taskCounterPerServer = + @VisibleForTesting + final ConcurrentMap taskCounterPerServer = new ConcurrentHashMap(); // Start configuration settings. - protected final int startLogErrorsCnt; + final int startLogErrorsCnt; /** * The number of tasks simultaneously executed on the cluster. */ - protected final int maxTotalConcurrentTasks; + private final int maxTotalConcurrentTasks; /** * The max heap size of all tasks simultaneously executed on a server. */ - protected final long maxHeapSizePerRequest; - protected final long maxHeapSizeSubmit; + private final long maxHeapSizePerRequest; + private final long maxHeapSizeSubmit; /** * The number of tasks we run in parallel on a single region. * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start * a set of operations on a region before the previous one is done. As well, this limits * the pressure we put on the region server. */ - protected final int maxConcurrentTasksPerRegion; + @VisibleForTesting + final int maxConcurrentTasksPerRegion; /** * The number of task simultaneously executed on a single region server. */ - protected final int maxConcurrentTasksPerServer; - protected final long pause; - protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified - protected int numTries; - protected int serverTrackerTimeout; - protected int rpcTimeout; - protected int operationTimeout; - protected long primaryCallTimeoutMicroseconds; + @VisibleForTesting + final int maxConcurrentTasksPerServer; + final long pause; + final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified + final int numTries; + @VisibleForTesting + int serverTrackerTimeout; + final long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ - protected final boolean logBatchErrorDetails; + final boolean logBatchErrorDetails; // End configuration settings. - public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, + public AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) { + RpcControllerFactory rpcFactory) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } this.connection = hc; - this.pool = pool; this.globalErrors = useGlobalErrors ? new BatchErrors() : null; this.id = COUNTER.incrementAndGet(); @@ -249,8 +251,6 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.rpcTimeout = rpcTimeout; - this.operationTimeout = operationTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -307,37 +307,27 @@ class AsyncProcess { } /** - * @return pool if non null, otherwise returns this.pool if non null, otherwise throws - * RuntimeException + * The submitted task may be not accomplished at all if there are too many running tasks or + * other limits. + * @param The class to cast the result + * @param task The setting and data + * @return AsyncRequestFuture + * @throws java.io.InterruptedIOException */ - protected ExecutorService getPool(ExecutorService pool) { - if (pool != null) { - return pool; + public AsyncRequestFuture submit(AsyncProcessTask task) throws InterruptedIOException { + AsyncRequestFuture reqFuture = checkTask(task); + if (reqFuture != null) { + return reqFuture; + } + SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows(); + switch (submittedRows) { + case ALL: + return submitAll(task); + case AT_LEAST_ONE: + return submit(task, true); + default: + return submit(task, false); } - if (this.pool != null) { - return this.pool; - } - throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); - } - - /** - * See #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean). - * Uses default ExecutorService for this AP (must have been created with one). - */ - public AsyncRequestFuture submit(TableName tableName, - final RowAccess rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { - return submit(null, tableName, rows, atLeastOne, callback, needResults); - } - /** - * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}. - * Uses the {@link ListRowAccess} to wrap the {@link List}. - */ - public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - List rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { - return submit(pool, tableName, new ListRowAccess(rows), atLeastOne, - callback, needResults); } /** @@ -351,14 +341,13 @@ class AsyncProcess { * @param needResults Whether results are needed, or can be discarded. * @param rows - the submitted row. Modified by the method: we remove the rows we took. * @param atLeastOne true if we should submit at least a subset. + * @param rpcTimeout timeout for each rpc request + * @param operationTimeout timeout for each call */ - public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - RowAccess rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { - if (rows.isEmpty()) { - return NO_REQS_RESULT; - } - + private AsyncRequestFuture submit(AsyncProcessTask task, + boolean atLeastOne) throws InterruptedIOException { + TableName tableName = task.getTableName(); + RowAccess rows = task.getRowAccess(); Map actionsByServer = new HashMap(); List retainedActions = new ArrayList(rows.size()); @@ -426,8 +415,8 @@ class AsyncProcess { if (retainedActions.isEmpty()) return NO_REQS_RESULT; - return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, - locationErrors, locationErrorRows, actionsByServer, pool); + return submitMultiActions(task, retainedActions, nonceGroup, + locationErrors, locationErrorRows, actionsByServer); } private RowCheckerHost createRowCheckerHost() { @@ -442,13 +431,10 @@ class AsyncProcess { , new SubmittedSizeChecker(maxHeapSizeSubmit) )); } - AsyncRequestFuture submitMultiActions(TableName tableName, - List retainedActions, long nonceGroup, Batch.Callback callback, - Object[] results, boolean needResults, List locationErrors, - List locationErrorRows, Map actionsByServer, - ExecutorService pool) { - AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1); + AsyncRequestFuture submitMultiActions(AsyncProcessTask task, + List retainedActions, long nonceGroup, List locationErrors, + List locationErrorRows, Map actionsByServer) { + AsyncRequestFutureImpl ars = createAsyncRequestFuture(task, retainedActions, nonceGroup); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -462,14 +448,6 @@ class AsyncProcess { return ars; } - public void setRpcTimeout(int rpcTimeout) { - this.rpcTimeout = rpcTimeout; - } - - public void setOperationTimeout(int operationTimeout) { - this.operationTimeout = operationTimeout; - } - /** * Helper that is used when grouping the actions per region server. * @@ -493,10 +471,6 @@ class AsyncProcess { multiAction.add(regionName, action); } - public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, -1); - } /** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. @@ -506,11 +480,11 @@ class AsyncProcess { * @param rows the list of rows. * @param callback the callback. * @param results Optional array to return the results thru; backward compat. - * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting. + * @param rpcTimeout timeout for each rpc request + * @param operationTimeout timeout for each call */ - public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results, - CancellableRegionServerCallable callable, int rpcTimeout) { + private AsyncRequestFuture submitAll(AsyncProcessTask task) { + RowAccess rows = task.getRowAccess(); List actions = new ArrayList(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -528,26 +502,45 @@ class AsyncProcess { setNonce(ng, r, action); actions.add(action); } - AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, rpcTimeout); + AsyncRequestFutureImpl ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup()); ars.groupAndSendMultiAction(actions, 1); return ars; } + private AsyncRequestFuture checkTask(AsyncProcessTask task) { + if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) { + return NO_REQS_RESULT; + } + Objects.requireNonNull(task.getPool(), "The pool can't be NULL"); + checkOperationTimeout(task.getOperationTimeout()); + checkRpcTimeout(task.getRpcTimeout()); + return null; + } + private void setNonce(NonceGenerator ng, Row r, Action action) { if (!(r instanceof Append) && !(r instanceof Increment)) return; action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. } - protected AsyncRequestFutureImpl createAsyncRequestFuture( - TableName tableName, List actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int rpcTimeout) { - return new AsyncRequestFutureImpl( - tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, operationTimeout, - rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this); + private int checkTimeout(String name, int timeout) { + if (timeout < 0) { + throw new RuntimeException("The " + name + " must be bigger than zero," + + "current value is" + timeout); + } + return timeout; + } + private int checkOperationTimeout(int operationTimeout) { + return checkTimeout("operation timeout", operationTimeout); + } + + private int checkRpcTimeout(int rpcTimeout) { + return checkTimeout("rpc timeout", rpcTimeout); + } + + @VisibleForTesting + AsyncRequestFutureImpl createAsyncRequestFuture( + AsyncProcessTask task, List actions, long nonceGroup) { + return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this); } /** Wait until the async does not have more than max tasks in progress. */ @@ -614,7 +607,7 @@ class AsyncProcess { * {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created. */ public boolean hasError() { - return globalErrors.hasErrors(); + return globalErrors != null && globalErrors.hasErrors(); } /** @@ -630,7 +623,7 @@ class AsyncProcess { public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( List failedRows, String tableName) throws InterruptedIOException { waitForMaximumCurrentTasks(0, tableName); - if (!globalErrors.hasErrors()) { + if (globalErrors == null || !globalErrors.hasErrors()) { return null; } if (failedRows != null) { @@ -644,7 +637,7 @@ class AsyncProcess { /** * increment the tasks counters for a given set of regions. MT safe. */ - protected void incTaskCounters(Collection regions, ServerName sn) { + void incTaskCounters(Collection regions, ServerName sn) { tasksInProgress.incrementAndGet(); computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); @@ -657,7 +650,7 @@ class AsyncProcess { /** * Decrements the counters for a given region and the region server. MT Safe. */ - protected void decTaskCounters(Collection regions, ServerName sn) { + void decTaskCounters(Collection regions, ServerName sn) { for (byte[] regBytes : regions) { AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); regionCnt.decrementAndGet(); @@ -676,7 +669,7 @@ class AsyncProcess { @VisibleForTesting protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable, int rpcTimeout) { - return rpcCallerFactory. newCaller(rpcTimeout); + return rpcCallerFactory. newCaller(checkRpcTimeout(rpcTimeout)); } @@ -687,7 +680,7 @@ class AsyncProcess { * We may benefit from connection-wide tracking of server errors. * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection */ - protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { + ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { return new ConnectionImplementation.ServerErrorTracker( this.serverTrackerTimeout, this.numTries); } @@ -953,26 +946,4 @@ class AsyncProcess { } } } - - public static class ListRowAccess implements RowAccess { - private final List data; - ListRowAccess(final List data) { - this.data = data; - } - - @Override - public int size() { - return data.size(); - } - - @Override - public boolean isEmpty() { - return data.isEmpty(); - } - - @Override - public Iterator iterator() { - return data.iterator(); - } - } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java new file mode 100644 index 0000000..eda1db2 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.coprocessor.Batch; + +/** + * Contains the attributes of a task which will be executed + * by {@link org.apache.hadoop.hbase.client.AsyncProcess}. + * The attributes will be validated by AsyncProcess. + * It's intended for advanced client applications. + * @param The type of response from server-side + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class AsyncProcessTask { + /** + * The number of processed rows. + * The AsyncProcess has traffic control which may reject some rows. + */ + public enum SubmittedRows { + ALL, + AT_LEAST_ONE, + NORMAL + } + public static Builder newBuilder(final Batch.Callback callback) { + return new Builder<>(callback); + } + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private ExecutorService pool; + private TableName tableName; + private RowAccess rows; + private SubmittedRows submittedRows = SubmittedRows.ALL; + private Batch.Callback callback; + private boolean needResults; + private int rpcTimeout; + private int operationTimeout; + private CancellableRegionServerCallable callable; + private Object[] results; + + private Builder() { + } + + private Builder(Batch.Callback callback) { + this.callback = callback; + } + + Builder setResults(Object[] results) { + this.results = results; + if (results != null && results.length != 0) { + setNeedResults(true); + } + return this; + } + + public Builder setPool(ExecutorService pool) { + this.pool = pool; + return this; + } + + public Builder setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public Builder setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + return this; + } + + public Builder setTableName(TableName tableName) { + this.tableName = tableName; + return this; + } + + public Builder setRowAccess(List rows) { + this.rows = new ListRowAccess<>(rows); + return this; + } + + public Builder setRowAccess(RowAccess rows) { + this.rows = rows; + return this; + } + + public Builder setSubmittedRows(SubmittedRows submittedRows) { + this.submittedRows = submittedRows; + return this; + } + + public Builder setNeedResults(boolean needResults) { + this.needResults = needResults; + return this; + } + + Builder setCallable(CancellableRegionServerCallable callable) { + this.callable = callable; + return this; + } + + public AsyncProcessTask build() { + return new AsyncProcessTask<>(pool, tableName, rows, submittedRows, + callback, callable, needResults, rpcTimeout, operationTimeout, results); + } + } + private final ExecutorService pool; + private final TableName tableName; + private final RowAccess rows; + private final SubmittedRows submittedRows; + private final Batch.Callback callback; + private final CancellableRegionServerCallable callable; + private final boolean needResults; + private final int rpcTimeout; + private final int operationTimeout; + private final Object[] results; + AsyncProcessTask(AsyncProcessTask task) { + this(task.getPool(), task.getTableName(), task.getRowAccess(), + task.getSubmittedRows(), task.getCallback(), task.getCallable(), + task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(), + task.getResults()); + } + AsyncProcessTask(ExecutorService pool, TableName tableName, + RowAccess rows, SubmittedRows size, Batch.Callback callback, + CancellableRegionServerCallable callable, boolean needResults, + int rpcTimeout, int operationTimeout, Object[] results) { + this.pool = pool; + this.tableName = tableName; + this.rows = rows; + this.submittedRows = size; + this.callback = callback; + this.callable = callable; + this.needResults = needResults; + this.rpcTimeout = rpcTimeout; + this.operationTimeout = operationTimeout; + this.results = results; + } + + public int getOperationTimeout() { + return operationTimeout; + } + + public ExecutorService getPool() { + return pool; + } + + public TableName getTableName() { + return tableName; + } + + public RowAccess getRowAccess() { + return rows; + } + + public SubmittedRows getSubmittedRows() { + return submittedRows; + } + + public Batch.Callback getCallback() { + return callback; + } + + CancellableRegionServerCallable getCallable() { + return callable; + } + + Object[] getResults() { + return results; + } + + public boolean getNeedResults() { + return needResults; + } + + public int getRpcTimeout() { + return rpcTimeout; + } + + static class ListRowAccess implements RowAccess { + + private final List data; + + ListRowAccess(final List data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public Iterator iterator() { + return data.iterator(); + } + } +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index d176ce1..98e04d9 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -300,11 +300,11 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; - private CancellableRegionServerCallable currentCallable; - private int operationTimeout; - private int rpcTimeout; + private final CancellableRegionServerCallable currentCallable; + private final int operationTimeout; + private final int rpcTimeout; private final Map> heapSizesByServer = new HashMap<>(); - protected AsyncProcess asyncProcess; + private final AsyncProcess asyncProcess; /** * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only @@ -339,32 +339,27 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } } - - - public AsyncRequestFutureImpl(TableName tableName, List actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, - CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout, - AsyncProcess asyncProcess) { - this.pool = pool; - this.callback = callback; + public AsyncRequestFutureImpl(AsyncProcessTask task, List actions, + long nonceGroup, AsyncProcess asyncProcess) { + this.pool = task.getPool(); + this.callback = task.getCallback(); this.nonceGroup = nonceGroup; - this.tableName = tableName; + this.tableName = task.getTableName(); this.actionsInProgress.set(actions.size()); - if (results != null) { - assert needResults; - if (results.length != actions.size()) { + if (task.getResults() == null) { + results = task.getNeedResults() ? new Object[actions.size()] : null; + } else { + if (task.getResults().length != actions.size()) { throw new AssertionError("results.length"); } - this.results = results; + this.results = task.getResults(); for (int i = 0; i != this.results.length; ++i) { results[i] = null; } - } else { - this.results = needResults ? new Object[actions.size()] : null; } List replicaGetIndices = null; boolean hasAnyReplicaGets = false; - if (needResults) { + if (results != null) { // Check to see if any requests might require replica calls. // We expect that many requests will consist of all or no multi-replica gets; in such // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will @@ -414,10 +409,10 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { this.errorsByServer = createServerErrorTracker(); this.errors = (asyncProcess.globalErrors != null) ? asyncProcess.globalErrors : new BatchErrors(); - this.operationTimeout = operationTimeout; - this.rpcTimeout = rpcTimeout; - this.currentCallable = callable; - if (callable == null) { + this.operationTimeout = task.getOperationTimeout(); + this.rpcTimeout = task.getRpcTimeout(); + this.currentCallable = task.getCallable(); + if (task.getCallable() == null) { tracker = new RetryingTimeTracker().start(); } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index f7eb09d..6735bf4 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; @@ -57,63 +56,63 @@ import java.util.concurrent.atomic.AtomicLong; @InterfaceAudience.Private @InterfaceStability.Evolving public class BufferedMutatorImpl implements BufferedMutator { - private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); - private final ExceptionListener listener; - - protected ClusterConnection connection; // non-final so can be overridden in test private final TableName tableName; - private volatile Configuration conf; - - @VisibleForTesting - final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue(); - @VisibleForTesting - AtomicLong currentWriteBufferSize = new AtomicLong(0); - + private final Configuration conf; + private final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue<>(); + private final AtomicLong currentWriteBufferSize = new AtomicLong(0); /** * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. */ - @VisibleForTesting - AtomicInteger undealtMutationCount = new AtomicInteger(0); - private long writeBufferSize; + private final AtomicInteger undealtMutationCount = new AtomicInteger(0); + private volatile long maxWriteBufferSize; + private final AtomicInteger rpcTimeout; + private final AtomicInteger operationTimeout; private final int maxKeyValueSize; - private boolean closed = false; + private final boolean cleanupPoolOnClose; + private volatile boolean closed = false; private final ExecutorService pool; - private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor - private int operationTimeout; - - @VisibleForTesting - protected AsyncProcess ap; // non-final so can be overridden in test + private final AsyncProcess ap; - BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params) { if (conn == null || conn.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } - this.tableName = params.getTableName(); - this.connection = conn; - this.conf = connection.getConfiguration(); - this.pool = params.getPool(); + this.conf = conn.getConfiguration(); + if (params.getPool() == null) { + this.pool = HTable.getDefaultExecutor(conf); + cleanupPoolOnClose = true; + } else { + this.pool = params.getPool(); + cleanupPoolOnClose = false; + } this.listener = params.getListener(); - - ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); - this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? - params.getWriteBufferSize() : tableConf.getWriteBufferSize(); + this.maxWriteBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? + params.getWriteBufferSize() : conn.getConnectionConfiguration().getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? - params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); - - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.operationTimeout = conn.getConfiguration().getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, - writeRpcTimeout, operationTimeout); + params.getMaxKeyValueSize() : conn.getConnectionConfiguration().getMaxKeyValueSize(); + this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ? + params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); + this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ? + params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); + if (params.getAsyncProcess() == null) { + this.ap = conn.createAsyncProcess(); + } else { + this.ap = params.getAsyncProcess(); + } + } + + @VisibleForTesting + ExecutorService getPool() { + return pool; + } + + @VisibleForTesting + AsyncProcess getAsyncProcess() { + return ap; } @Override @@ -166,7 +165,7 @@ public class BufferedMutatorImpl implements BufferedMutator { // Now try and queue what needs to be queued. while (undealtMutationCount.get() != 0 - && currentWriteBufferSize.get() > writeBufferSize) { + && currentWriteBufferSize.get() > maxWriteBufferSize) { backgroundFlushCommits(false); } } @@ -185,22 +184,22 @@ public class BufferedMutatorImpl implements BufferedMutator { // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. backgroundFlushCommits(true); - this.pool.shutdown(); - boolean terminated; - int loopCnt = 0; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - loopCnt += 1; - if (loopCnt >= 10) { - LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); - break; - } - } while (!terminated); - + if (cleanupPoolOnClose) { + this.pool.shutdown(); + boolean terminated; + int loopCnt = 0; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + loopCnt += 1; + if (loopCnt >= 10) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + break; + } + } while (!terminated); + } } catch (InterruptedException e) { LOG.warn("waitForTermination interrupted"); - } finally { this.closed = true; } @@ -231,8 +230,9 @@ public class BufferedMutatorImpl implements BufferedMutator { if (!synchronous) { QueueRowAccess taker = new QueueRowAccess(); + AsyncProcessTask task = wrapAsyncProcessTask(taker); try { - ap.submit(tableName, taker, true, null, false); + ap.submit(task); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); @@ -243,9 +243,10 @@ public class BufferedMutatorImpl implements BufferedMutator { } if (synchronous || ap.hasError()) { QueueRowAccess taker = new QueueRowAccess(); + AsyncProcessTask task = wrapAsyncProcessTask(taker); try { while (!taker.isEmpty()) { - ap.submit(tableName, taker, true, null, false); + ap.submit(task); taker.reset(); } } finally { @@ -265,14 +266,44 @@ public class BufferedMutatorImpl implements BufferedMutator { } /** + * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}. + * @param taker access the inner buffer. + * @return An AsyncProcessTask which always returns the latest rpc and operation timeout. + */ + private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) { + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(taker) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) + .build(); + return new AsyncProcessTask(task) { + @Override + public int getRpcTimeout() { + return rpcTimeout.get(); + } + + @Override + public int getOperationTimeout() { + return operationTimeout.get(); + } + }; + } + /** * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought * not be called for production uses. + * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()} + * will be called. + * @param writeBufferSize The max size of internal buffer where data is stored. + * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException + * if an I/O error occurs and there are too many retries. + * @throws java.io.InterruptedIOException if the I/O task is interrupted. * @deprecated Going away when we drop public support for {@link HTable}. */ @Deprecated public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, InterruptedIOException { - this.writeBufferSize = writeBufferSize; + this.maxWriteBufferSize = writeBufferSize; if (currentWriteBufferSize.get() > writeBufferSize) { flush(); } @@ -283,19 +314,27 @@ public class BufferedMutatorImpl implements BufferedMutator { */ @Override public long getWriteBufferSize() { - return this.writeBufferSize; + return this.maxWriteBufferSize; } @Override - public void setRpcTimeout(int timeout) { - this.writeRpcTimeout = timeout; - ap.setRpcTimeout(timeout); + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout.set(rpcTimeout); } @Override - public void setOperationTimeout(int timeout) { - this.operationTimeout = timeout; - ap.setOperationTimeout(operationTimeout); + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout.set(operationTimeout); + } + + @VisibleForTesting + long getCurrentWriteBufferSize() { + return currentWriteBufferSize.get(); + } + + @VisibleForTesting + int size() { + return undealtMutationCount.get(); } private class QueueRowAccess implements RowAccess { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index d4cdead..dd46f8f 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -37,14 +37,13 @@ public class BufferedMutatorParams { private final TableName tableName; private long writeBufferSize = UNSET; private int maxKeyValueSize = UNSET; + private int rpcTimeout = UNSET; + private int operationTimeout = UNSET; private ExecutorService pool = null; - private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { - @Override - public void onException(RetriesExhaustedWithDetailsException exception, - BufferedMutator bufferedMutator) - throws RetriesExhaustedWithDetailsException { - throw exception; - } + private AsyncProcess ap = null; + private BufferedMutator.ExceptionListener listener + = (RetriesExhaustedWithDetailsException exception, BufferedMutator bufferedMutator) -> { + throw exception; }; public BufferedMutatorParams(TableName tableName) { @@ -59,6 +58,33 @@ public class BufferedMutatorParams { return writeBufferSize; } + public BufferedMutatorParams asyncProcess(final AsyncProcess ap) { + this.ap = ap; + return this; + } + + public AsyncProcess getAsyncProcess() { + return ap; + } + + public BufferedMutatorParams rpcTimeout(final int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public int getRpcTimeout() { + return rpcTimeout; + } + + public BufferedMutatorParams opertationTimeout(final int operationTimeout) { + this.operationTimeout = operationTimeout; + return this; + } + + public int getOperationTimeout() { + return operationTimeout; + } + /** * Override the write buffer size specified by the provided {@link Connection}'s * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index b979c6a..b58d0e2 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -136,6 +136,15 @@ public interface Connection extends Abortable, Closeable { BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException; /** + * Retrieve a {@link AsyncProcess} for submitting the requests from client-side to server directly. + * The {@link AsyncProcess} returned by this method is thread-safe. + * This object can be used for long lived table operations. + * + * @return a {@link AsyncProcess}. + */ + AsyncProcess createAsyncProcess(); + + /** * Retrieve a RegionLocator implementation to inspect region information on a table. The returned * RegionLocator is not thread-safe, so a new instance should be created for each using thread. * diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 35bebae..41f5baf 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -42,7 +42,8 @@ public class ConnectionConfiguration { private final int replicaCallTimeoutMicroSecondScan; private final int retries; private final int maxKeyValueSize; - + private final int readRpcTimeout; + private final int writeRpcTimeout; // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; @@ -80,6 +81,12 @@ public class ConnectionConfiguration { Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + + this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } /** @@ -99,6 +106,16 @@ public class ConnectionConfiguration { this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; + this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + } + + public int getReadRpcTimeout() { + return readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return writeRpcTimeout; } public long getWriteBufferSize() { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index e75d9a5..10191a5 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -228,7 +228,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - this.asyncProcess = createAsyncProcess(this.conf); + this.asyncProcess = createAsyncProcess(false); if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { this.metrics = new MetricsConnection(this); } else { @@ -306,16 +306,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (params.getTableName() == null) { throw new IllegalArgumentException("TableName cannot be null."); } - if (params.getPool() == null) { - params.pool(HTable.getDefaultExecutor(getConfiguration())); - } if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { params.writeBufferSize(connectionConfig.getWriteBufferSize()); } if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } - return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); + return new BufferedMutatorImpl(this, params); } @Override @@ -1751,15 +1748,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { metaCache.clearCache(regionInfo); } - // For tests to override. - protected AsyncProcess createAsyncProcess(Configuration conf) { - // No default pool available. - int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, - rpcTimeout, operationTimeout); + @Override + public AsyncProcess createAsyncProcess() { + return createAsyncProcess(true); + } + + private AsyncProcess createAsyncProcess(boolean useGlobalErrors) { + return new AsyncProcess(this, conf, rpcCallerFactory, useGlobalErrors, rpcControllerFactory); } @Override diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index dd11abf..f79a55e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -103,27 +103,28 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceStability.Stable public class HTable implements Table { private static final Log LOG = LogFactory.getLog(HTable.class); - protected ClusterConnection connection; + private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; + private final ClusterConnection connection; private final TableName tableName; - private volatile Configuration configuration; - private ConnectionConfiguration connConfiguration; - protected BufferedMutatorImpl mutator; + private final Configuration configuration; + private final ConnectionConfiguration connConfiguration; + @VisibleForTesting + BufferedMutatorImpl mutator; private boolean closed = false; - protected int scannerCaching; - protected long scannerMaxResultSize; - private ExecutorService pool; // For Multi & Scan + private final int scannerCaching; + private final long scannerMaxResultSize; + private final ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc private int readRpcTimeout; // timeout for each read rpc request private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() - private final boolean cleanupConnectionOnClose; // close the connection in close() - private Consistency defaultConsistency = Consistency.STRONG; - private HRegionLocator locator; + private final HRegionLocator locator; /** The Async process for batch */ - protected AsyncProcess multiAp; - private RpcRetryingCallerFactory rpcCallerFactory; - private RpcControllerFactory rpcControllerFactory; + @VisibleForTesting + AsyncProcess multiAp; + private final RpcRetryingCallerFactory rpcCallerFactory; + private final RpcControllerFactory rpcControllerFactory; // Marked Private @since 1.0 @InterfaceAudience.Private @@ -167,22 +168,42 @@ public class HTable implements Table { throw new IllegalArgumentException("Given table name is null"); } this.tableName = tableName; - this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.connConfiguration = tableConfig; - this.pool = pool; + if (tableConfig == null) { + connConfiguration = new ConnectionConfiguration(configuration); + } else { + connConfiguration = tableConfig; + } if (pool == null) { this.pool = getDefaultExecutor(this.configuration); this.cleanupPoolOnClose = true; } else { + this.pool = pool; this.cleanupPoolOnClose = false; } + if (rpcCallerFactory == null) { + this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); + } else { + this.rpcCallerFactory = rpcCallerFactory; + } - this.rpcCallerFactory = rpcCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; + if (rpcControllerFactory == null) { + this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); + } else { + this.rpcControllerFactory = rpcControllerFactory; + } + + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); + this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - this.finishSetup(); + // puts need to track errors globally due to how the APIs currently work. + multiAp = this.connection.getAsyncProcess(); + this.locator = new HRegionLocator(tableName, connection); } /** @@ -190,20 +211,23 @@ public class HTable implements Table { * @throws IOException */ @VisibleForTesting - protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { + protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException { connection = conn; - tableName = params.getTableName(); - connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); + this.tableName = mutator.getName(); + this.configuration = connection.getConfiguration(); + connConfiguration = new ConnectionConfiguration(configuration); cleanupPoolOnClose = false; - cleanupConnectionOnClose = false; - // used from tests, don't trust the connection is real - this.mutator = new BufferedMutatorImpl(conn, null, null, params); - this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.mutator = mutator; + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); + this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + this.rpcControllerFactory = null; + this.rpcCallerFactory = null; + this.pool = mutator.getPool(); + this.locator = null; } /** @@ -214,36 +238,6 @@ public class HTable implements Table { } /** - * setup this HTable's parameter based on the passed configuration - */ - private void finishSetup() throws IOException { - if (connConfiguration == null) { - connConfiguration = new ConnectionConfiguration(configuration); - } - - this.operationTimeout = tableName.isSystemTable() ? - connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.scannerCaching = connConfiguration.getScannerCaching(); - this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - if (this.rpcCallerFactory == null) { - this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); - } - if (this.rpcControllerFactory == null) { - this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); - } - - // puts need to track errors globally due to how the APIs currently work. - multiAp = this.connection.getAsyncProcess(); - this.locator = new HRegionLocator(getName(), connection); - } - - /** * {@inheritDoc} */ @Override @@ -423,7 +417,7 @@ public class HTable implements Table { get = ReflectionUtils.newInstance(get.getClass(), get); get.setCheckExistenceOnly(checkExistenceOnly); if (get.getConsistency() == null){ - get.setConsistency(defaultConsistency); + get.setConsistency(DEFAULT_CONSISTENCY); } } @@ -483,13 +477,37 @@ public class HTable implements Table { @Override public void batch(final List actions, final Object[] results) throws InterruptedException, IOException { - batch(actions, results, -1); + int rpcTimeout = writeRpcTimeout; + boolean hasRead = false; + boolean hasWrite = false; + for (Row action : actions) { + if (action instanceof Mutation) { + hasWrite = true; + } else { + hasRead = true; + } + if (hasRead && hasWrite) { + break; + } + } + if (hasRead && !hasWrite) { + rpcTimeout = readRpcTimeout; + } + batch(actions, results, rpcTimeout); } public void batch(final List actions, final Object[] results, int rpcTimeout) throws InterruptedException, IOException { - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, - rpcTimeout); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(actions) + .setResults(results) + .setRpcTimeout(rpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -509,8 +527,20 @@ public class HTable implements Table { public static void doBatchWithCallback(List actions, Object[] results, Callback callback, ClusterConnection connection, ExecutorService pool, TableName tableName) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - AsyncRequestFuture ars = connection.getAsyncProcess().submitAll( - pool, tableName, actions, callback, results); + int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); + int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) + .setPool(pool) + .setTableName(tableName) + .setRowAccess(actions) + .setResults(results) + .setOperationTimeout(operationTimeout) + .setRpcTimeout(writeTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -536,8 +566,16 @@ public class HTable implements Table { } }; List rows = Collections.singletonList(delete); - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, null, callable, writeRpcTimeout); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rows) + .setCallable(callable) + .setRpcTimeout(writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -615,8 +653,16 @@ public class HTable implements Table { return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); } }; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, writeRpcTimeout); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rm.getMutations()) + .setCallable(callable) + .setRpcTimeout(writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -795,8 +841,18 @@ public class HTable implements Table { }; List rows = Collections.singletonList(delete); Object[] results = new Object[1]; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, results, callable, -1); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rows) + .setCallable(callable) + // TODO any better timeout? + .setRpcTimeout(readRpcTimeout + writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .setResults(results) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -839,8 +895,18 @@ public class HTable implements Table { * It is excessive to send such a large array, but that is required by the framework right now * */ Object[] results = new Object[rm.getMutations().size()]; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, -1); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rm.getMutations()) + .setResults(results) + .setCallable(callable) + // TODO any better timeout? + .setRpcTimeout(readRpcTimeout + writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -925,26 +991,27 @@ public class HTable implements Table { if (this.closed) { return; } - flushCommits(); - if (cleanupPoolOnClose) { - this.pool.shutdown(); - try { - boolean terminated = false; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - } while (!terminated); - } catch (InterruptedException e) { - this.pool.shutdownNow(); - LOG.warn("waitForTermination interrupted"); + try { + flushCommits(); + if (mutator != null) { + mutator.close(); } - } - if (cleanupConnectionOnClose) { - if (this.connection != null) { - this.connection.close(); + if (cleanupPoolOnClose) { + this.pool.shutdown(); + try { + boolean terminated = false; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + } while (!terminated); + } catch (InterruptedException e) { + this.pool.shutdownNow(); + LOG.warn("waitForTermination interrupted"); + } } + } finally { + this.closed = true; } - this.closed = true; } // validate for well-formedness @@ -1102,7 +1169,6 @@ public class HTable implements Table { if (mutator != null) { mutator.setOperationTimeout(operationTimeout); } - multiAp.setOperationTimeout(operationTimeout); } @Override @@ -1134,7 +1200,6 @@ public class HTable implements Table { if (mutator != null) { mutator.setRpcTimeout(writeRpcTimeout); } - multiAp.setRpcTimeout(writeRpcTimeout); } @Override @@ -1217,37 +1282,41 @@ public class HTable implements Table { Object[] results = new Object[execs.size()]; AsyncProcess asyncProcess = - new AsyncProcess(connection, configuration, pool, + new AsyncProcess(connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration), readRpcTimeout, - operationTimeout); - - AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs, - new Callback() { - @Override - public void update(byte[] region, byte[] row, - ClientProtos.CoprocessorServiceResult serviceResult) { - if (LOG.isTraceEnabled()) { - LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + - ": region=" + Bytes.toStringBinary(region) + - ", row=" + Bytes.toStringBinary(row) + - ", value=" + serviceResult.getValue().getValue()); - } - try { - Message.Builder builder = responsePrototype.newBuilderForType(); - org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, - serviceResult.getValue().getValue().toByteArray()); - callback.update(region, row, (R) builder.build()); - } catch (IOException e) { - LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), - e); - callbackErrorExceptions.add(e); - callbackErrorActions.add(execsByRow.get(row)); - callbackErrorServers.add("null"); - } - } - }, results); - + true, RpcControllerFactory.instantiate(configuration)); + + Callback resultsCallback + = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { + if (LOG.isTraceEnabled()) { + LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + + ": region=" + Bytes.toStringBinary(region) + + ", row=" + Bytes.toStringBinary(row) + + ", value=" + serviceResult.getValue().getValue()); + } + try { + Message.Builder builder = responsePrototype.newBuilderForType(); + org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, + serviceResult.getValue().getValue().toByteArray()); + callback.update(region, row, (R) builder.build()); + } catch (IOException e) { + LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), + e); + callbackErrorExceptions.add(e); + callbackErrorActions.add(execsByRow.get(row)); + callbackErrorServers.add("null"); + } + }; + AsyncProcessTask task = AsyncProcessTask.newBuilder(resultsCallback) + .setPool(pool) + .setTableName(tableName) + .setRowAccess(execs) + .setResults(results) + .setRpcTimeout(readRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture future = asyncProcess.submit(task); future.waitUntilDone(); if (future.hasError()) { @@ -1270,10 +1339,10 @@ public class HTable implements Table { .pool(pool) .writeBufferSize(connConfiguration.getWriteBufferSize()) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) + .opertationTimeout(operationTimeout) + .rpcTimeout(writeRpcTimeout) ); } - mutator.setRpcTimeout(writeRpcTimeout); - mutator.setOperationTimeout(operationTimeout); return mutator; } } \ No newline at end of file diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 8ff64bf..c03b969 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -443,7 +443,7 @@ public class HTableMultiplexer { private final AtomicInteger retryInQueue = new AtomicInteger(0); private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor private final int operationTimeout; - + private final ExecutorService pool; public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, ScheduledExecutorService executor) { @@ -457,10 +457,10 @@ public class HTableMultiplexer { HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, - writeRpcTimeout, operationTimeout); + this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); + this.pool = pool; } protected LinkedBlockingQueue getQueue() { @@ -594,9 +594,14 @@ public class HTableMultiplexer { Map actionsByServer = Collections.singletonMap(server, actions); try { + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setResults(results) + .setPool(pool) + .setRpcTimeout(writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .build(); AsyncRequestFuture arf = - ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null, - null, actionsByServer, null); + ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); arf.waitUntilDone(); if (arf.hasError()) { // We just log and ignore the exception here since failed Puts will be resubmit again. diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java index 788f1a4..85fd590 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java @@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Evolving -@VisibleForTesting -interface RowAccess extends Iterable { +public interface RowAccess extends Iterable { /** * @return true if there are no elements. */ diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index bb6cbb5..cfd9692 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -36,9 +36,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -59,15 +59,12 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess; +import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -80,8 +77,11 @@ import org.junit.rules.TestRule; import org.mockito.Mockito; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -92,46 +92,50 @@ import static org.junit.Assert.fail; public class TestAsyncProcess { @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). withLookingForStuckThread(true).build(); - private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class); + private static final Log LOG = LogFactory.getLog(TestAsyncProcess.class); private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes(); private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes(); private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes(); private static final byte[] FAILS = "FAILS".getBytes(); - private static final Configuration conf = new Configuration(); - - private static ServerName sn = ServerName.valueOf("s1:1,1"); - private static ServerName sn2 = ServerName.valueOf("s2:2,2"); - private static ServerName sn3 = ServerName.valueOf("s3:3,3"); - private static HRegionInfo hri1 = + private static final Configuration CONF = new Configuration(); + private static final ConnectionConfiguration CONNECTION_CONFIG = new ConnectionConfiguration(CONF); + private static final ServerName sn = ServerName.valueOf("s1:1,1"); + private static final ServerName sn2 = ServerName.valueOf("s2:2,2"); + private static final ServerName sn3 = ServerName.valueOf("s3:3,3"); + private static final HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); - private static HRegionInfo hri2 = + private static final HRegionInfo hri2 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); - private static HRegionInfo hri3 = + private static final HRegionInfo hri3 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); - private static HRegionLocation loc1 = new HRegionLocation(hri1, sn); - private static HRegionLocation loc2 = new HRegionLocation(hri2, sn); - private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2); + private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); + private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); + private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); // Replica stuff - private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1), + private static final HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1), hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); - private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); - private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), + private static final HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); + private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); - private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), + private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3)); - private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null); + private static final RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null); private static final String success = "success"; private static Exception failure = new Exception("failure"); - private static int NB_RETRIES = 3; + private static final int NB_RETRIES = 3; + private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @BeforeClass public static void beforeClass(){ - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); + CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); } static class CountingThreadFactory implements ThreadFactory { @@ -153,20 +157,21 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List allReqs = new ArrayList(); public AtomicInteger callsCt = new AtomicInteger(); - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + private long previousTimeout = -1; + final ExecutorService service; @Override - protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, - List actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int curTimeout) { + protected AsyncRequestFutureImpl createAsyncRequestFuture( + AsyncProcessTask task, List actions, long nonceGroup) { // Test HTable has tableName of null, so pass DUMMY_TABLE + AsyncProcessTask wrap = new AsyncProcessTask(task){ + @Override + public TableName getTableName() { + return DUMMY_TABLE; + } + }; AsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl( - DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, operationTimeout, rpcTimeout, this); + wrap, actions, nonceGroup, this); allReqs.add(r); return r; } @@ -176,49 +181,54 @@ public class TestAsyncProcess { } public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { - super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout, - operationTimeout); + super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { - super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), - rpcTimeout, operationTimeout); + super(hc, conf, + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + service = Executors.newFixedThreadPool(5); } - public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, - @SuppressWarnings("unused") boolean dummy) { - super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) { - @Override - public void execute(Runnable command) { - throw new RejectedExecutionException("test under failure"); - } - }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), - rpcTimeout, operationTimeout); + public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, + List rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) + .setPool(pool == null ? service : pool) + .setTableName(tableName) + .setRowAccess(rows) + .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) + .setNeedResults(needResults) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .build(); + return submit(task); + } + + public AsyncRequestFuture submit(TableName tableName, + final List rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + return submit(null, tableName, rows, atLeastOne, callback, needResults); } @Override - public AsyncRequestFuture submit(TableName tableName, RowAccess rows, - boolean atLeastOne, Callback callback, boolean needResults) + public AsyncRequestFuture submit(AsyncProcessTask task) throws InterruptedIOException { + previousTimeout = task.getRpcTimeout(); // We use results in tests to check things, so override to always save them. - return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); + AsyncProcessTask wrap = new AsyncProcessTask(task) { + @Override + public boolean getNeedResults() { + return true; + } + }; + return super.submit(wrap); } @Override - public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results, - CancellableRegionServerCallable callable, int curTimeout) { - previousTimeout = curTimeout; - return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout); - } - @Override protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); @@ -260,12 +270,9 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { - public MyAsyncRequestFutureImpl(TableName tableName, List actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout, - int rpcTimeout, AsyncProcess asyncProcess) { - super(tableName, actions, nonceGroup, pool, needResults, - results, callback, callable, operationTimeout, rpcTimeout, asyncProcess); + public MyAsyncRequestFutureImpl(AsyncProcessTask task, List actions, + long nonceGroup, AsyncProcess asyncProcess) { + super(task, actions, nonceGroup, asyncProcess); } @Override @@ -483,7 +490,7 @@ public class TestAsyncProcess { final boolean usedRegions[]; protected MyConnectionImpl2(List hrl) throws IOException { - super(conf); + super(CONF); this.hrl = hrl; this.usedRegions = new boolean[hrl.size()]; } @@ -560,7 +567,7 @@ public class TestAsyncProcess { AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); try { - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); fail("The maxHeapSizePerRequest must be bigger than zero"); } catch (IllegalArgumentException e) { } @@ -604,7 +611,6 @@ public class TestAsyncProcess { final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); // sn has two regions long putSizeSN = 0; @@ -630,11 +636,12 @@ public class TestAsyncProcess { + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); + try (HTable ht = new HTable(conn, mutator)) { + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); ht.put(puts); List reqs = ap.allReqs; @@ -685,7 +692,7 @@ public class TestAsyncProcess { @Test public void testSubmit() throws Exception { ClusterConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess(hc, conf); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); List puts = new ArrayList(); puts.add(createPut(1, true)); @@ -704,7 +711,7 @@ public class TestAsyncProcess { updateCalled.incrementAndGet(); } }; - AsyncProcess ap = new MyAsyncProcess(hc, conf); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); List puts = new ArrayList(); puts.add(createPut(1, true)); @@ -718,7 +725,7 @@ public class TestAsyncProcess { @Test public void testSubmitBusyRegion() throws Exception { ClusterConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess(hc, conf); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); List puts = new ArrayList(); puts.add(createPut(1, true)); @@ -738,7 +745,7 @@ public class TestAsyncProcess { @Test public void testSubmitBusyRegionServer() throws Exception { ClusterConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess(hc, conf); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer)); @@ -758,7 +765,7 @@ public class TestAsyncProcess { @Test public void testFail() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); List puts = new ArrayList(); Put p = createPut(1, false); @@ -784,7 +791,7 @@ public class TestAsyncProcess { @Test public void testSubmitTrue() throws IOException { - final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + final MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); ap.tasksInProgress.incrementAndGet(); final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion); ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); @@ -823,7 +830,7 @@ public class TestAsyncProcess { @Test public void testFailAndSuccess() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); List puts = new ArrayList(); puts.add(createPut(1, false)); @@ -850,7 +857,7 @@ public class TestAsyncProcess { @Test public void testFlush() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); List puts = new ArrayList(); puts.add(createPut(1, false)); @@ -868,13 +875,13 @@ public class TestAsyncProcess { @Test public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { ClusterConnection hc = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false); testTaskCount(ap); } @Test public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { - Configuration copyConf = new Configuration(conf); + Configuration copyConf = new Configuration(CONF); copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); ClusterConnection hc = createHConnection(); @@ -885,7 +892,7 @@ public class TestAsyncProcess { testTaskCount(ap); } - private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException { + private void testTaskCount(MyAsyncProcess ap) throws InterruptedIOException, InterruptedException { List puts = new ArrayList<>(); for (int i = 0; i != 3; ++i) { puts.add(createPut(1, true)); @@ -907,7 +914,7 @@ public class TestAsyncProcess { @Test public void testMaxTask() throws Exception { - final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + final MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); for (int i = 0; i < 1000; i++) { ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn); @@ -999,38 +1006,53 @@ public class TestAsyncProcess { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); - Mockito.when(hc.getConfiguration()).thenReturn(conf); + Mockito.when(hc.getConfiguration()).thenReturn(CONF); + Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); return hc; } @Test public void testHTablePutSuccess() throws Exception { - BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); - ht.ap = new MyAsyncProcess(createHConnection(), conf, true); + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam); Put put = createPut(1, true); - Assert.assertEquals(0, ht.getWriteBufferSize()); + Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), ht.getWriteBufferSize()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); ht.mutate(put); - Assert.assertEquals(0, ht.getWriteBufferSize()); + ht.flush(); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + } + + @Test + public void testBufferedMutatorImplWithSharedPool() throws Exception { + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam); + + ht.close(); + assertFalse(ap.service.isShutdown()); } private void doHTableFailedPut(boolean bufferOn) throws Exception { ClusterConnection conn = createHConnection(); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); if (bufferOn) { bufferParam.writeBufferSize(1024L * 1024L); } else { bufferParam.writeBufferSize(0L); } - - HTable ht = new HTable(conn, bufferParam); - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); + HTable ht = new HTable(conn, mutator); Put put = createPut(1, false); - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); try { ht.put(put); if (bufferOn) { @@ -1039,7 +1061,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -1067,10 +1089,10 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { ClusterConnection conn = createHConnection(); - BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, - new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - mutator.ap = ap; + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) + .writeBufferSize(0); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); Put p = createPut(1, false); mutator.mutate(p); @@ -1083,13 +1105,13 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, mutator.writeAsyncBuffer.size()); + Assert.assertEquals(0, mutator.size()); try { mutator.mutate(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); + Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); } @Test @@ -1302,9 +1324,12 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { - ClusterConnection conn = new MyConnectionImpl(conf); - HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); - ht.multiAp = new MyAsyncProcess(conn, conf, false); + ClusterConnection conn = new MyConnectionImpl(CONF); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); + HTable ht = new HTable(conn, mutator); + ht.multiAp = new MyAsyncProcess(conn, CONF, false); List puts = new ArrayList(); puts.add(createPut(1, true)); @@ -1332,18 +1357,16 @@ public class TestAsyncProcess { } @Test public void testErrorsServers() throws IOException { - Configuration configuration = new Configuration(conf); + Configuration configuration = new Configuration(CONF); ClusterConnection conn = new MyConnectionImpl(configuration); - BufferedMutatorImpl mutator = - new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); - configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); - MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); - mutator.ap = ap; + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); + configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); - mutator.ap.serverTrackerTimeout = 1; + Assert.assertNotNull(ap.createServerErrorTracker()); + Assert.assertTrue(ap.serverTrackerTimeout > 200); + ap.serverTrackerTimeout = 1; Put p = createPut(1, false); mutator.mutate(p); @@ -1361,14 +1384,15 @@ public class TestAsyncProcess { public void testReadAndWriteTimeout() throws IOException { final long readTimeout = 10 * 1000; final long writeTimeout = 20 * 1000; - Configuration copyConf = new Configuration(conf); + Configuration copyConf = new Configuration(CONF); copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); ClusterConnection conn = createHConnection(); Mockito.when(conn.getConfiguration()).thenReturn(copyConf); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); + try (HTable ht = new HTable(conn, mutator)) { ht.multiAp = ap; List gets = new LinkedList<>(); gets.add(new Get(DUMMY_BYTES_1)); @@ -1399,12 +1423,12 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { - ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); - mutator.ap = ap; + ClusterConnection conn = new MyConnectionImpl(CONF); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1421,13 +1445,11 @@ public class TestAsyncProcess { @Test public void testCallQueueTooLarge() throws IOException { - ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); - mutator.ap = ap; - - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - + ClusterConnection conn = new MyConnectionImpl(CONF); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1459,10 +1481,11 @@ public class TestAsyncProcess { } MyConnectionImpl2 con = new MyConnectionImpl2(hrls); - HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE)); - MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); + MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam); + HTable ht = new HTable(con, mutator); ht.multiAp = ap; - ht.batch(gets, null); Assert.assertEquals(ap.nbActions.get(), NB_REGS); @@ -1482,7 +1505,16 @@ public class TestAsyncProcess { // One region has no replica, so the main call succeeds for it. MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); - AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[3]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); Assert.assertEquals(2, ap.getReplicaCallCount()); } @@ -1492,7 +1524,16 @@ public class TestAsyncProcess { // Main call succeeds before replica calls are kicked off. MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[3]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); Assert.assertEquals(0, ap.getReplicaCallCount()); } @@ -1502,7 +1543,16 @@ public class TestAsyncProcess { // Either main or replica can succeed. MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); long replicaCalls = ap.getReplicaCallCount(); Assert.assertTrue(replicaCalls >= 0); @@ -1517,7 +1567,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); ap.setPrimaryCallDelay(sn2, 2000); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FALSE, RR.TRUE); Assert.assertEquals(1, ap.getReplicaCallCount()); } @@ -1530,7 +1589,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); ap.addFailures(hri1, hri2); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FAILED, RR.FAILED); Assert.assertEquals(0, ap.getReplicaCallCount()); } @@ -1542,7 +1610,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); ap.addFailures(hri1, hri1r2, hri2); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.TRUE, RR.TRUE); Assert.assertEquals(2, ap.getReplicaCallCount()); } @@ -1554,7 +1631,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FAILED, RR.FALSE); // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); @@ -1583,6 +1669,14 @@ public class TestAsyncProcess { return ap; } + private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) { + return new BufferedMutatorParams(name) + .asyncProcess(ap) + .pool(ap.service) + .rpcTimeout(RPC_TIMEOUT) + .opertationTimeout(OPERATION_TIMEOUT); + } + private static List makeTimelineGets(byte[]... rows) { List result = new ArrayList(); for (byte[] row : rows) { @@ -1663,14 +1757,9 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, - ExecutorService pool) { - super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf), rpcTimeout, operationTimeout); + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { + super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( + conf)); } } @@ -1681,12 +1770,19 @@ public class TestAsyncProcess { MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(200)); - AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool); + AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); List puts = new ArrayList(); puts.add(createPut(1, true)); - - ap.submit(null, DUMMY_TABLE, puts, false, null, false); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(myPool) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(puts) + .setSubmittedRows(SubmittedRows.NORMAL) + .build(); + ap.submit(task); Assert.assertTrue(puts.isEmpty()); } @@ -1695,7 +1791,7 @@ public class TestAsyncProcess { final AtomicLong tasks = new AtomicLong(0); final AtomicInteger max = new AtomicInteger(0); final CyclicBarrier barrier = new CyclicBarrier(2); - final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf); + final AsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); Runnable runnable = new Runnable() { @Override public void run() { @@ -1738,18 +1834,18 @@ public class TestAsyncProcess { */ @Test public void testRetryPauseWithCallQueueTooBigException() throws Exception { - Configuration myConf = new Configuration(conf); + Configuration myConf = new Configuration(CONF); final long specialPause = 500L; final int retries = 1; myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); ClusterConnection conn = new MyConnectionImpl(myConf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); - mutator.ap = ap; + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1775,8 +1871,9 @@ public class TestAsyncProcess { final long normalPause = myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); - mutator.ap = ap; - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + mutator = new BufferedMutatorImpl(conn, bufferParam); + Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); mutator.mutate(p); startTime = System.currentTimeMillis(); try { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index ee89609..e5ab3e8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -126,11 +126,8 @@ public class HConnectionTestingUtility { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( - new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))); + new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false, + RpcControllerFactory.instantiate(conf))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 53488ec..2c5e89d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.AsyncProcessTask; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; @@ -137,14 +138,20 @@ public class TestClientPushback { final CountDownLatch latch = new CountDownLatch(1); final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - - ((HTable) table).mutator.ap.submit(null, tableName, ops, true, new Batch.Callback() { - @Override - public void update(byte[] region, byte[] row, Result result) { + BufferedMutatorImpl mutator = ((HTable) table).mutator; + Batch.Callback callback = (byte[] r, byte[] row, Result result) -> { endTime.set(EnvironmentEdgeManager.currentTime()); latch.countDown(); - } - }, true); + }; + AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) + .setPool(mutator.getPool()) + .setTableName(tableName) + .setRowAccess(ops) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) + .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout()) + .setRpcTimeout(60 * 1000) + .build(); + mutator.getAsyncProcess().submit(task); // Currently the ExponentialClientBackoffPolicy under these test conditions // produces a backoffTime of 151 milliseconds. This is long enough so the // wait and related checks below are reasonable. Revisit if the backoff diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 6d1e1f0..0f7f3d9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -563,9 +563,17 @@ public class TestReplicasClient { gets.add(g); Object[] results = new Object[2]; - AsyncRequestFuture reqs = ap.submitAll( - HTable.getDefaultExecutor(HTU.getConfiguration()), - table.getName(), gets, null, results); + int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout(); + int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout(); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(HTable.getDefaultExecutor(HTU.getConfiguration())) + .setTableName(table.getName()) + .setRowAccess(gets) + .setResults(results) + .setOperationTimeout(operationTimeout) + .setRpcTimeout(readTimeout) + .build(); + AsyncRequestFuture reqs = ap.submit(task); reqs.waitUntilDone(); // verify we got the right results back for (Object r : results) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 5157868..09c7561 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -492,7 +492,6 @@ public class TestPerColumnFamilyFlush { Thread.sleep(100); } } - table.close(); assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound); assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java index 68fffb1..380c252 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java @@ -171,22 +171,35 @@ public class TestTablePermissions { } } + /** + * The AccessControlLists.addUserPermission may throw exception before closing the table. + */ + private void addUserPermission(Configuration conf, UserPermission userPerm, Table t) throws IOException { + try { + AccessControlLists.addUserPermission(conf, userPerm, t); + } finally { + t.close(); + } + } + @Test public void testBasicWrite() throws Exception { Configuration conf = UTIL.getConfiguration(); - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { + try (Connection connection = ConnectionFactory.createConnection(conf)) { // add some permissions - AccessControlLists.addUserPermission(conf, + addUserPermission(conf, new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null, - UserPermission.Action.READ, UserPermission.Action.WRITE), table); - AccessControlLists.addUserPermission(conf, + UserPermission.Action.READ, UserPermission.Action.WRITE), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null, - UserPermission.Action.READ), table); - AccessControlLists.addUserPermission(conf, + UserPermission.Action.READ), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("humphrey"), TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER, - UserPermission.Action.READ), table); + UserPermission.Action.READ), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); } // retrieve the same ListMultimap perms = @@ -274,23 +287,22 @@ public class TestTablePermissions { @Test public void testPersistence() throws Exception { Configuration conf = UTIL.getConfiguration(); - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { - AccessControlLists.addUserPermission(conf, + try (Connection connection = ConnectionFactory.createConnection(conf)) { + addUserPermission(conf, new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null, - (byte[])null, TablePermission.Action.READ), table); - AccessControlLists.addUserPermission(conf, + (byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null, (byte[])null, TablePermission.Action.READ, - TablePermission.Action.WRITE), table); - AccessControlLists.addUserPermission(conf, + TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("clark"), TEST_TABLE, TEST_FAMILY, - TablePermission.Action.READ), table); - AccessControlLists.addUserPermission(conf, + TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("dwight"), TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER, - TablePermission.Action.WRITE), table); + TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); } // verify permissions survive changes in table metadata ListMultimap preperms = @@ -404,17 +416,17 @@ public class TestTablePermissions { Configuration conf = UTIL.getConfiguration(); // add some permissions - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { - AccessControlLists.addUserPermission(conf, + try (Connection connection = ConnectionFactory.createConnection(conf)) { + addUserPermission(conf, new UserPermission(Bytes.toBytes("user1"), - Permission.Action.READ, Permission.Action.WRITE), table); - AccessControlLists.addUserPermission(conf, + Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("user2"), - Permission.Action.CREATE), table); - AccessControlLists.addUserPermission(conf, + Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME)); + addUserPermission(conf, new UserPermission(Bytes.toBytes("user3"), - Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), table); + Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); } ListMultimap perms = AccessControlLists.getTablePermissions(conf, null); List user1Perms = perms.get("user1"); @@ -448,11 +460,11 @@ public class TestTablePermissions { // currently running user is the system user and should have global admin perms User currentUser = User.getCurrent(); assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN)); - try (Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) { + try (Connection connection = ConnectionFactory.createConnection(conf)) { for (int i=1; i<=50; i++) { - AccessControlLists.addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i), - Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), table); + addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i), + Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE), + connection.getTable(AccessControlLists.ACL_TABLE_NAME)); // make sure the system user still shows as authorized assertTrue("Failed current user auth check on iter "+i, authManager.authorize(currentUser, Permission.Action.ADMIN)); diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index 6ebf044..249eb28 100644 --- hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService import scala.util.Random import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator, - Connection, BufferedMutatorParams, Admin} + Connection, BufferedMutatorParams, Admin, AsyncProcess} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName import org.apache.spark.Logging @@ -49,6 +49,7 @@ class ConnectionMocker extends Connection { def getTable(tableName: TableName, pool: ExecutorService): Table = null def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null def getBufferedMutator (tableName: TableName): BufferedMutator = null + def createAsyncProcess: AsyncProcess = null def getAdmin: Admin = null def close(): Unit = {