From da0874884f7543ca329a664d310fc785c3abbf6b Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Tue, 31 May 2016 08:26:35 +0200 Subject: [PATCH] HBASE-15921 Add first AsyncTable impl and create TableImpl based on it --- .../hbase/client/AbstractRegionServerCallable.java | 2 +- .../hbase/client/AsyncRegionServerCallable.java | 71 ++ .../hadoop/hbase/client/AsyncRetryingCallable.java | 42 ++ .../org/apache/hadoop/hbase/client/AsyncTable.java | 240 +++++++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 733 +++++++++++++++++++++ .../hadoop/hbase/client/ClusterConnection.java | 18 +- .../org/apache/hadoop/hbase/client/Connection.java | 17 + .../hbase/client/ConnectionImplementation.java | 53 +- .../apache/hadoop/hbase/client/FailedFuture.java | 74 +++ .../org/apache/hadoop/hbase/client/HTable.java | 17 +- .../hadoop/hbase/client/KeyedPromiseKeeper.java | 106 +++ .../apache/hadoop/hbase/client/PromiseKeeper.java | 109 +++ .../hadoop/hbase/client/RetryingPromise.java | 111 ++++ .../hadoop/hbase/client/RpcRetryingCaller.java | 20 +- .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 126 +++- .../client/RpcRetryingCallerWithReadReplicas.java | 173 +++-- .../hadoop/hbase/client/SinglePromiseKeeper.java | 110 ++++ .../hadoop/hbase/client/SuccessfulFuture.java | 68 ++ .../org/apache/hadoop/hbase/client/TableImpl.java | 612 +++++++++++++++++ .../hadoop/hbase/client/VoidPromiseKeeper.java | 105 +++ .../hadoop/hbase/client/coprocessor/Batch.java | 25 + .../hadoop/hbase/client/TestClientScanner.java | 15 +- .../apache/hadoop/hbase/HBaseTestingUtility.java | 51 +- .../hadoop/hbase/client/TestFromClientSide.java | 8 +- .../hbase/client/TestFromClientSideAsync.java | 227 +++++++ .../org/apache/hadoop/hbase/client/TestHCM.java | 26 +- .../hbase/client/TestShortCircuitConnection.java | 4 +- 27 files changed, 2996 insertions(+), 167 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedFuture.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/KeyedPromiseKeeper.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/PromiseKeeper.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingPromise.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/SinglePromiseKeeper.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/SuccessfulFuture.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/VoidPromiseKeeper.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java index 7279d81..af0d5a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.util.Bytes; * @param the class that the ServerCallable handles */ @InterfaceAudience.Private -abstract class AbstractRegionServerCallable implements RetryingCallable { +abstract class AbstractRegionServerCallable implements RetryingCallableBase { // Public because used outside of this package over in ipc. private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java new file mode 100644 index 0000000..97f105f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.EventExecutor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; + +/** + * Implementations call a RegionServer and implement {@link #call(int)}. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. + * @param the class that the ServerCallable handles + */ +@InterfaceAudience.Private +public abstract class AsyncRegionServerCallable extends AbstractRegionServerCallable + implements AsyncRetryingCallable { + private AsyncRpcChannel channel; + // Public because used outside of this package over in ipc. + + /** + * @param connection Connection to use. + * @param tableName Table name to which row belongs. + * @param row The row we want in tableName. + */ + public AsyncRegionServerCallable(Connection connection, TableName tableName, byte[] row) { + super(connection, tableName, row); + } + + @Override + void setClientByServiceName(ServerName service) throws IOException { + this.channel = getConnection().getAsyncClientChannel(service); + } + + /** + * Get the Async RPC channel for this Callable + * @return AsyncRpcChannel + */ + public AsyncRpcChannel getChannel() { + return channel; + } + + @Override + public EventExecutor getEventExecutor() { + return ((ClusterConnection)this.connection).getEventExecutor(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java new file mode 100644 index 0000000..ecaf1fd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.EventExecutor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A Callable that will be retried async. + * @param Type of object returned + */ +@InterfaceAudience.Private +public interface AsyncRetryingCallable extends RetryingCallableBase { + /** + * Computes a result, or throws an exception if unable to do so. + * + * @param callTimeout - the time available for this call. 0 for infinite. + * @return Future which handles the Result + */ + Future call(int callTimeout); + + /** + * Get EventLoop to operate async operations on + * @return AsyncRpcChannel + */ + EventExecutor getEventExecutor(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java new file mode 100644 index 0000000..08af6e2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +/** + * Used to communicate with a single HBase table. + * Obtain an instance from a {@link Connection} and call {@link #close()} afterwards. + * + *

AsyncTable can be used to get, put, delete or scan data from a table. + * @since 2.0.0 + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface AsyncTable extends Closeable { + /** + * Gets the fully qualified table name instance of this table. + */ + TableName getName(); + + /** + * Returns the {@link Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will + * affect this instance. + */ + Configuration getConfiguration(); + + /** + * Extracts certain cells from a given row. + * @param get The object that specifies what data to fetch and from which row. + * @return Future with the data coming from the specified row, if it exists. If the row + * specified doesn't exist, the {@link Result} instance returned won't + * contain any {@link org.apache.hadoop.hbase.KeyValue}, as indicated by + * {@link Result#isEmpty()}. + */ + Future get(Get get); + + /** + * Extracts certain cells from the given rows, in batch. + * + * @param gets The objects that specify what data to fetch and from which rows. + * @return Future with the data coming from the specified rows, if it exists. If the row + * specified doesn't exist, the {@link Result} instance returned won't contain any {@link + * org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. If there are any + * failures even after retries, there will be a null in the results array for those Gets, AND an + * exception will be thrown. + */ + Future get(List gets); + + /** + * Mutates a Row with a Mutation. Can be a Put, Append, Delete or Increment + * @param mutation to be committed + * @return Future with Result if Append or Increment + */ + Future mutate(Mutation mutation); + + /** + * Mutates a Row with multiple mutations on one row. + * @param mutation to be committed + * @return Future with Result if Append or Increment + */ + Future mutate(RowMutations mutation); + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. + * If it does, it performs the row mutations. If the passed value is null, the check + * is for the lack of column (ie: non-existence) + * + * It accepts a single Mutation like Put/Delete/Append/Increment + * + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @return Future with true if the new put was executed, false otherwise + */ + Future checkAndMutate(byte[] family, byte[] qualifier, + CompareFilter.CompareOp compareOp, byte[] value, Mutation mutation); + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. + * If it does, it performs the row mutations. If the passed value is null, the check + * is for the lack of column (ie: non-existence) + * + * It accepts a RowMutations object which contains multiple mutations on a single row. + * + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutations mutations to perform if check succeeds + * @return Future with true if the new put was executed, false otherwise + */ + Future checkAndMutate(byte[] family, byte[] qualifier, + CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutations); + + + /** + * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the + * table region containing the specified row. The row given does not actually have + * to exist. Whichever region would contain the row based on start and end keys will + * be used. Note that the {@code row} parameter is also not passed to the + * coprocessor handler registered for this protocol, unless the {@code row} + * is separately passed as an argument in the service request. The parameter + * here is only used to locate the region used to handle the call. + * + *

+ * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published + * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations: + *

+ * + *
+ *
+   * CoprocessorRpcChannel channel = myTable.coprocessorService(rowkey);
+   * MyService.BlockingInterface service = MyService.newBlockingStub(channel);
+   * MyCallRequest request = MyCallRequest.newBuilder()
+   *     ...
+   *     .build();
+   * MyCallResponse response = service.myCall(null, request);
+   * 
+ * + * @param row The row key used to identify the remote region location + * @return A CoprocessorRpcChannel instance + */ + CoprocessorRpcChannel coprocessorService(byte[] row); + + /** + * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table + * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and + * invokes the passed {@link Batch.AsyncCall#call} method with each + * {@link com.google.protobuf.Service} instance. + * + * @param service the protocol buffer {@code Service} implementation to call + * @param startKey start region selection with region containing this row. If {@code null}, the + * selection will start with the first table region. + * @param endKey select regions up to and including the region containing this row. If {@code + * null}, selection will continue through the last table region. + * @param callable this instance's {@link org.apache.hadoop.hbase.client.coprocessor.Batch + * .AsyncCall#call} + * method will be invoked once per table region, using the {@link com.google.protobuf.Service} + * instance connected to that region. + * @param the {@link com.google.protobuf.Service} subclass to connect to + * @param Return type for the {@code callable} parameter's {@link + * org.apache.hadoop.hbase.client.coprocessor.Batch.AsyncCall#call} method + * @return Future with a map of result values keyed by region name + */ + Future> coprocessorService(final Class service, + byte[] startKey, byte[] endKey, final Batch.AsyncCall callable); + + + /** + * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table + * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all + * the invocations to the same region server will be batched into one call. The coprocessor + * service is invoked according to the service instance, method name and parameters. + * + * @param methodDescriptor + * the descriptor for the protobuf service method to call. + * @param request + * the method call parameters + * @param startKey + * start region selection with region containing this row. If {@code null}, the + * selection will start with the first table region. + * @param endKey + * select regions up to and including the region containing this row. If {@code null}, + * selection will continue through the last table region. + * @param responsePrototype + * the proto type of the response of the method in Service. + * @param + * the response type for the coprocessor Service method + * @return Future with a map of result values keyed by region name + */ + Future> batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype); + + /** + * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table + * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all + * the invocations to the same region server will be batched into one call. The coprocessor + * service is invoked according to the service instance, method name and parameters. + * + *

+ * The given + * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[], + * Object)} method will be called with the return value from each region's invocation. + *

+ * + * @param methodDescriptor + * the descriptor for the protobuf service method to call. + * @param request + * the method call parameters + * @param startKey + * start region selection with region containing this row. If {@code null}, the + * selection will start with the first table region. + * @param endKey + * select regions up to and including the region containing this row. If {@code null}, + * selection will continue through the last table region. + * @param responsePrototype + * the proto type of the response of the method in Service. + * @param callback + * callback to invoke with the response for each region + * @param + * the response type for the coprocessor Service method + * @return Future with a map of result values keyed by region name + */ + Future batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback callback); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java new file mode 100644 index 0000000..85a6023 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -0,0 +1,733 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import io.netty.util.concurrent.GenericFutureListener; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.MessageConverter; +import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * An implementation of {@link AsyncTable}. Used to communicate with a single HBase table. + * Lightweight. Get as needed and just close when done. + * Instances of this class SHOULD NOT be constructed directly. + * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} + * class comment for an example of how. + * + *

AsyncTableImpl is not a client API. Use {@link AsyncTable} instead. It is marked + * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in + * Hadoop + * Interface Classification + * There are no guarantees for backwards source / binary compatibility and methods or class can + * change or go away without deprecation. + * + * @see AsyncTable + * @see Connection + * @see ConnectionFactory + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class AsyncTableImpl implements AsyncTable { + private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class); + + protected static final Descriptors.MethodDescriptor GET_DESCRIPTOR = ClientProtos.ClientService + .getDescriptor().findMethodByName("Get"); + + protected static final Descriptors.MethodDescriptor MUTATE_DESCRIPTOR = ClientProtos.ClientService + .getDescriptor().findMethodByName("Mutate"); + + protected static final Descriptors.MethodDescriptor MULTI_DESCRIPTOR = ClientProtos.ClientService + .getDescriptor().findMethodByName("Multi"); + + public static final Descriptors.MethodDescriptor EXEC_SERVICE_DESCRIPTOR = ClientProtos + .ClientService.getDescriptor().findMethodByName("ExecService"); + + private final ClusterConnection connection; + private final Configuration configuration; + private final TableName tableName; + private final ConnectionConfiguration ConnectionConfiguration; + private final RpcRetryingCallerFactory rpcCallerFactory; + + private final int operationTimeout; + + private final Consistency defaultConsistency = Consistency.STRONG; + private final int scannerBatching; + private final long scannerMaxResultSize; + + /** + * Creates an object to access a HBase table. + * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to + * get a {@link Table} instance (use {@link Table} instead of {@link AsyncTableImpl}). + * @param tableName Name of the table. + * @param connection HConnection to be used. + * @param tableConfig config for the table + * @param rpcCallerFactory factory for RPC callers + * @throws IOException if a remote or network exception occurs + */ + @InterfaceAudience.Private + protected AsyncTableImpl(TableName tableName, final ClusterConnection connection, + final ConnectionConfiguration tableConfig, final RpcRetryingCallerFactory rpcCallerFactory) + throws IOException { + if (connection == null || connection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + this.tableName = tableName; + this.connection = connection; + this.configuration = connection.getConfiguration(); + + if (rpcCallerFactory == null) { + this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); + } else { + this.rpcCallerFactory = rpcCallerFactory; + } + + if (tableConfig == null) { + ConnectionConfiguration = new ConnectionConfiguration(configuration); + } else { + this.ConnectionConfiguration = tableConfig; + } + + this.scannerBatching = ConnectionConfiguration.getScannerCaching(); + this.scannerMaxResultSize = ConnectionConfiguration.getScannerMaxResultSize(); + + this.operationTimeout = tableName.isSystemTable() ? + ConnectionConfiguration.getMetaOperationTimeout() : + ConnectionConfiguration.getOperationTimeout(); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public TableName getName() { + return tableName; + } + + static final MessageConverter GET_RESPONSE_CONVERTER = + new MessageConverter() { + @Override + public Result convert(ClientProtos.GetResponse msg, CellScanner cellScanner) throws IOException { + assert msg != null; + return ProtobufUtil.toResult(msg.getResult(), cellScanner); + } + }; + + static final MessageConverter MUTATE_CONVERTER = + new MessageConverter() { + @Override + public Result convert(ClientProtos.MutateResponse msg, CellScanner cellScanner) throws IOException { + assert msg != null; + ClientProtos.Result r = msg.getResult(); + if(r == null){ + return null; + }else{ + return ProtobufUtil.toResult(r, cellScanner); + } + } + }; + + static final MessageConverter MUTATE_CHECK_CONVERTER = + new MessageConverter() { + @Override + public Boolean convert(ClientProtos.MutateResponse msg, CellScanner cellScanner) { + assert msg != null; + return msg.getProcessed(); + } + }; + + static final MessageConverter MULTI_CONVERTER = + new MessageConverter() { + @Override + public Void convert(ClientProtos.MultiResponse msg, CellScanner cellScanner) { + assert msg != null; + return null; + } + }; + + /** + * {@inheritDoc} + */ + @Override + public Future get(Get get) { + return this.get(get, GET_RESPONSE_CONVERTER); + } + + /** + * {@inheritDoc} + */ + @Override + public Future get(List gets) { + PromiseKeeper pk = new PromiseKeeper<>(connection.getEventExecutor(), + gets.size(), new Result[gets.size()]); + for(Get get : gets) { + pk.addFuture(this.get(get)); + } + return pk; + } + + /** + * Handles Get with optional response converter + * + * @param get The Get Request + * @param converter for the GetResponse + * @param Type of Response object expected + * @return ResponsePromise with type of value expected + * @see Get + */ + private Future get(final Get get, + final MessageConverter converter) { + if (get.getConsistency() == null){ + get.setConsistency(defaultConsistency); + } + + if (get.getConsistency() == Consistency.STRONG) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), get.getRow()) { + @Override + public Future call(int callTimeout) { + ClientProtos.GetRequest request; + try { + request = RequestConverter + .buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); + } catch (IOException e) { + return new FailedFuture<>(getEventExecutor(),e); + } + + return getChannel().callMethod( + GET_DESCRIPTOR, + request, + null, + ClientProtos.GetResponse.getDefaultInstance(), + converter, + null, + callTimeout, + getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, + this.operationTimeout); + } + + // Call that takes into account the replica + RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas<>( + rpcCallerFactory, + tableName, this.connection, get, converter, + operationTimeout, ConnectionConfiguration.getPrimaryCallTimeoutMicroSecond()); + return callable.call(); + } + + /** + * Handles Mutate (Put/Delete/Append/Increment) + * + * @param mutation The Mutate Request + * @return ResponsePromise with Result if Append or Increment + */ + @Override + public Future mutate(final Mutation mutation) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), mutation.getRow()) { + @Override + public Future call(int callTimeout) { + ClientProtos.MutateRequest request; + try { + if(mutation instanceof Put) { + validatePut((Put) mutation); + + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), (Put) mutation); + } else if(mutation instanceof Delete) { + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), (Delete) mutation); + } else if(mutation instanceof Increment) { + NonceGenerator ng = getConnection().getNonceGenerator(); + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), + (Increment) mutation, + ng.getNonceGroup(), + ng.newNonce() + ); + } else if(mutation instanceof Append) { + NonceGenerator ng = getConnection().getNonceGenerator(); + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), + (Append) mutation, + ng.getNonceGroup(), + ng.newNonce()); + }else{ + throw new NotImplementedException("Unknown mutation "+mutation.getClass().getName()); + } + } catch (IOException e) { + return new FailedFuture<>(getEventExecutor(),e); + } + + return getChannel().callMethod( + MUTATE_DESCRIPTOR, + request, + null, + ClientProtos.MutateResponse.getDefaultInstance(), MUTATE_CONVERTER, null, + callTimeout, + getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, this + .operationTimeout); + + } + + @Override + public Future mutate(final RowMutations mutation) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), mutation.getRow()) { + @Override + public Future call(int callTimeout) { + ClientProtos.RegionAction.Builder regionMutationBuilder; + try { + regionMutationBuilder = RequestConverter + .buildRegionAction(getLocation().getRegionInfo().getRegionName(), mutation); + } catch (IOException e) { + return new FailedFuture<>(getEventExecutor(),e); + } + regionMutationBuilder.setAtomic(true); + ClientProtos.MultiRequest request = + ClientProtos.MultiRequest + .newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + + return getChannel().callMethod( + MULTI_DESCRIPTOR, + request, + null, + ClientProtos.MultiResponse.getDefaultInstance(), MULTI_CONVERTER, null, + callTimeout, + getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, this + .operationTimeout); + } + + @Override + public Future checkAndMutate(final byte[] family, + final byte[] qualifier, final CompareFilter.CompareOp compareOp, final byte[] value, + final Mutation mutation) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), mutation.getRow()) { + @Override + public Future call(int callTimeout) { + ClientProtos.MutateRequest request; + HBaseProtos.CompareType compareType = HBaseProtos.CompareType.valueOf(compareOp.name()); + BinaryComparator comparator = new BinaryComparator(value); + try { + if(mutation instanceof Put) { + validatePut((Put) mutation); + + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), + row, family, qualifier, comparator, compareType, (Put) mutation + ); + } else if(mutation instanceof Delete) { + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), + row, family, qualifier, comparator, compareType, (Delete) mutation + ); + } else if(mutation instanceof Increment) { + throw new RuntimeException( + "Mutation needs to be a Put or Delete for a checked mutation"); + } else if(mutation instanceof Append) { + throw new RuntimeException( + "Mutation needs to be a Put or Delete for a checked mutation"); + }else{ + throw new NotImplementedException("Unknown mutation "+mutation.getClass().getName()); + } + } catch (IOException e) { + return new FailedFuture<>(getEventExecutor(),e); + } + + return getChannel().callMethod( + MUTATE_DESCRIPTOR, + request, + null, + ClientProtos.MutateResponse.getDefaultInstance(), MUTATE_CHECK_CONVERTER, null, + callTimeout, + getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, + this.operationTimeout); + } + + @Override + public Future checkAndMutate(final byte[] family, + final byte[] qualifier, final CompareFilter.CompareOp compareOp, final byte[] value, + final RowMutations mutations) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), mutations.getRow()) { + @Override + public Future call(int callTimeout) { + ClientProtos.MultiRequest request; + HBaseProtos.CompareType compareType = HBaseProtos.CompareType.valueOf(compareOp.name()); + BinaryComparator comparator = new BinaryComparator(value); + try { + request = RequestConverter + .buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), + row, family, qualifier, comparator, compareType, mutations + ); + } catch (IOException e) { + return new FailedFuture<>(getEventExecutor(),e); + } + + return getChannel().callMethod( + MUTATE_DESCRIPTOR, + request, + null, + ClientProtos.MutateResponse.getDefaultInstance(), MUTATE_CHECK_CONVERTER, null, + callTimeout, + getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, + this.operationTimeout); + } + + /** + * {@inheritDoc} + */ + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return new RegionCoprocessorRpcChannel(connection, tableName, row); + } + @Override + public Future> coprocessorService(final Class + service, byte[] startKey, byte[] endKey, final Batch.AsyncCall callable) { + // get regions covered by the row range + List keys; + try { + keys = getStartKeysInRange(startKey, endKey); + } catch (IOException e) { + return new FailedFuture<>(connection.getEventExecutor(), e); + } + + KeyedPromiseKeeper keyedPromiseKeeper = + new KeyedPromiseKeeper<>(connection.getEventExecutor(),Bytes.BYTES_COMPARATOR); + + for (final KeyAndRegion r : keys) { + final RegionCoprocessorRpcChannel channel = + new RegionCoprocessorRpcChannel(connection, tableName, r.getKey()); + + try { + T instance = ProtobufUtil.newServiceStub(service, channel); + keyedPromiseKeeper.addPromise( + channel.getLastRegion(), + callable.call(instance) + ); + } catch (Exception e) { + return new FailedFuture<>(connection.getEventExecutor(), new IOException(e)); + } + } + + return keyedPromiseKeeper; + } + + /** + * Get the start keys within range + * @param start start key + * @param end end key + * @return List of region names + * @throws IOException if fetching fails + */ + private List getStartKeysInRange(byte[] start, byte[] end) throws IOException { + if (start == null) { + start = HConstants.EMPTY_START_ROW; + } + if (end == null) { + end = HConstants.EMPTY_END_ROW; + } + return getKeysAndRegionsInRange(connection, tableName, start, end, true, false); + } + + @Override + public Future> batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype) { + + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + if (endKey == null) { + endKey = HConstants.EMPTY_END_ROW; + } + // get regions covered by the row range + List keysAndRegions; + try { + keysAndRegions = getKeysAndRegionsInRange(connection, tableName, startKey, endKey, true, + false); + } catch (IOException e) { + return new FailedFuture<>(connection.getEventExecutor(),e); + } + + KeyedPromiseKeeper keyedPromiseKeeper = + new KeyedPromiseKeeper<>(connection.getEventExecutor(),Bytes.BYTES_COMPARATOR); + + // check if we have any calls to make + if (keysAndRegions.isEmpty()) { + LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + + ", end=" + Bytes.toStringBinary(endKey)); + keyedPromiseKeeper.setSuccess(new HashMap()); + return keyedPromiseKeeper; + } + + final MessageConverter converter = new MessageConverter() { + @Override + public R convert(R msg, CellScanner cellScanner) throws IOException { + return msg; + } + }; + + for (KeyAndRegion keyAndRegion : keysAndRegions) { + final byte[] region = keyAndRegion.getRegion().getRegionInfo().getRegionName(); + RegionCoprocessorServiceExec exec = + new RegionCoprocessorServiceExec( + region, keyAndRegion.getKey(), methodDescriptor, request); + + keyedPromiseKeeper.addPromise( + keyAndRegion.getKey(), + callCoprocessorService(exec, responsePrototype, converter)); + } + + return keyedPromiseKeeper; + } + + @Override + public Future batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype, final Batch.Callback callback) { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + if (endKey == null) { + endKey = HConstants.EMPTY_END_ROW; + } + // get regions covered by the row range + List keysAndRegions; + try { + keysAndRegions = getKeysAndRegionsInRange(connection, tableName, startKey, endKey, true, + false); + } catch (IOException e) { + return new FailedFuture<>(connection.getEventExecutor(),e); + } + + // check if we have any calls to make + if (keysAndRegions.isEmpty()) { + LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + + ", end=" + Bytes.toStringBinary(endKey)); + return new SuccessfulFuture<>(connection.getEventExecutor(), null); + } + + VoidPromiseKeeper promise = new VoidPromiseKeeper( + connection.getEventExecutor(), keysAndRegions.size()); + + final MessageConverter converter = new MessageConverter() { + @Override + public R convert(R msg, CellScanner cellScanner) throws IOException { + return msg; + } + }; + + for (final KeyAndRegion keyAndRegion : keysAndRegions) { + final byte[] region = keyAndRegion.getRegion().getRegionInfo().getRegionName(); + RegionCoprocessorServiceExec exec = + new RegionCoprocessorServiceExec( + region, keyAndRegion.getKey(), methodDescriptor, request); + + Future future = callCoprocessorService(exec, responsePrototype, converter); + future.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + callback.update(region, keyAndRegion.getKey(), future.getNow()); + } + }); + promise.addFuture(future); + } + + return promise; + } + + /** + * Call a single coprocessor call + * @param exec command to execute + * @param responsePrototype prototype of response + * @param converter to convert the Message + * @param Type of Response expected + * @return ResponsePromise for a single call + */ + private Future callCoprocessorService( + final RegionCoprocessorServiceExec exec, final R responsePrototype, + final MessageConverter converter) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), exec.getRow()) { + @Override + public Future call(int callTimeout) { + Message request = exec.getRequest(); + + return getChannel().callMethod(EXEC_SERVICE_DESCRIPTOR, request, null, responsePrototype, + converter, null, callTimeout, getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, + this.operationTimeout); + } + + /** + * Get the corresponding start keys and regions for an arbitrary range of + * keys. + *

+ * @param connection to use to connect + * @param name of table to connect to + * @param startKey Starting row in range, inclusive + * @param endKey Ending row in range + * @param includeEndKey true if endRow is inclusive, false if exclusive + * @param reload true to reload information or false to use cached information + * @return A pair of list of start keys and list of HRegionLocations that + * contain the specified range + * @throws IOException if a remote or network exception occurs + */ + protected static List getKeysAndRegionsInRange(ClusterConnection connection, + TableName name, + final byte[] startKey, final byte[] endKey, final boolean includeEndKey, + final boolean reload) throws IOException { + final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); + if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { + throw new IllegalArgumentException( + "Invalid range: " + Bytes.toStringBinary(startKey) + + " > " + Bytes.toStringBinary(endKey)); + } + List regionsInRange = new ArrayList<>(); + byte[] currentKey = startKey; + + try(RegionLocator regionLocator = connection.getRegionLocator(name)){ + HRegionLocation regionLocation; + do { + regionLocation = regionLocator.getRegionLocation(currentKey, reload); + regionsInRange.add(new KeyAndRegion(currentKey, regionLocation)); + currentKey = regionLocation.getRegionInfo().getEndKey(); + } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) && (endKeyIsEndOfTable + || Bytes.compareTo(currentKey, endKey) < 0 + || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); + return regionsInRange; + } + } + + /** + * Object to store start key and region together + */ + protected static final class KeyAndRegion { + private final byte[] key; + private final HRegionLocation region; + + /** + * Constructor + * @param key where scan should start + * @param region to operate on + */ + private KeyAndRegion(byte[] key, HRegionLocation region) { + this.key = key; + this.region = region; + } + + /** + * Get the key to start region walk + * @return key + */ + public byte[] getKey() { + return key; + } + + /** + * Get region to walk + * @return region location + */ + public HRegionLocation getRegion() { + return region; + } + } + + /** + * Validates the Put + * @param put to validate + * @throws IllegalArgumentException if Put is invalid + */ + private void validatePut(final Put put) throws IllegalArgumentException { + HTable.validatePut(put, ConnectionConfiguration.getMaxKeyValueSize()); + } + + /** + * Get priority for the tables + * @param tn TableName of table to get priority of + * @return priority + */ + public static int getPriority(TableName tn) { + return (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS; + } + + @Override + public String toString() { + return tableName + ";" + connection; + } + + @Override + public void close() throws IOException { + // Nothing to close at this moment + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 5c70b77..f45b68e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -1,6 +1,4 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import io.netty.util.concurrent.EventExecutor; import java.io.IOException; import java.util.List; @@ -31,6 +30,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -247,6 +247,15 @@ public interface ClusterConnection extends Connection { ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** + * Get an Async RPC channel for Client communications + * + * @param serverName to connect to + * @return RpcChannel to communicate with server + * @throws IOException if a remote or network exception occurs + */ + AsyncRpcChannel getAsyncClientChannel(ServerName serverName) throws IOException; + + /** * Find region location hosting passed row * @param tableName table name * @param row Row to find. @@ -342,4 +351,9 @@ public interface ClusterConnection extends Connection { * @throws IOException if a remote or network exception occurs */ int getCurrentNrHRS() throws IOException; + + /** + * @return the eventExecutor for this ClusterConnection + */ + EventExecutor getEventExecutor(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index b979c6a..b946873 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -105,6 +105,23 @@ public interface Connection extends Abortable, Closeable { Table getTable(TableName tableName, ExecutorService pool) throws IOException; /** + * Retrieve an Async Table implementation for accessing a table. + * The returned Table is not thread safe, a new instance should be created for each using thread. + * This is a lightweight operation, pooling or caching of the returned Table + * is neither required nor desired. + *

+ * The caller is responsible for calling {@link Table#close()} on the returned + * table instance. + *

+ * This method does not check table existence. An exception + * will be thrown if the table does not exist only when the first operation is + * attempted. + * @param tableName the name of the table + * @return a Table to use for interactions with this table + */ + AsyncTable getAsyncTable(TableName tableName) throws IOException; + + /** *

* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The * {@link BufferedMutator} returned by this method is thread-safe. This BufferedMutator will diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index d93a8b4..98c4f5f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -25,6 +25,7 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import io.netty.util.concurrent.EventExecutor; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -275,13 +277,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } @Override + public AsyncTable getAsyncTable(TableName tableName) throws IOException { + return new AsyncTableImpl(tableName, this, connectionConfig, rpcCallerFactory); + } + + @Override public Table getTable(TableName tableName) throws IOException { return getTable(tableName, getBatchPool()); } @Override public Table getTable(TableName tableName, ExecutorService pool) throws IOException { - return new HTable(tableName, this, connectionConfig, + return new TableImpl(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool); } @@ -1207,6 +1214,37 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return stub; } + @Override + public AsyncRpcChannel getAsyncClientChannel(final ServerName sn) throws IOException { + return this.getAsyncRpcChannel(ClientProtos.ClientService.getDescriptor().getName(),sn); + } + + /** + * Get an async rpc channel + * @param serviceName to get channel for + * @param serverName to connect to + * @return AsyncRpcChannel to communicate with + * @throws IOException if it fails to set up channel + */ + private AsyncRpcChannel getAsyncRpcChannel(String serviceName, final ServerName serverName) + throws IOException { + if (isDeadServer(serverName)) { + throw new RegionServerStoppedException(serverName + " is dead."); + } + String key = getStubKey(AsyncRpcChannel.class.getName(), serverName.getHostname(), + serverName.getPort(), this.hostnamesCanChange); + this.connectionLock.putIfAbsent(key, key); + AsyncRpcChannel channel; + synchronized (this.connectionLock.get(key)) { + channel = (AsyncRpcChannel)this.stubs.get(key); + if (channel == null) { + channel = this.rpcClient.createRpcChannel(serviceName, serverName, user); + this.stubs.put(key, channel); + } + } + return channel; + } + static String getStubKey(final String serviceName, final String rsHostname, int port, @@ -1745,6 +1783,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { metaCache.clearCache(location); } + @Override + public void updateCachedLocations(final TableName tableName, byte[] rowkey, + final Object exception, final HRegionLocation source) { + assert source != null; + updateCachedLocations(tableName, source.getRegionInfo().getRegionName() + , rowkey, exception, source.getServerName()); + } + /** * Update the location with the new value (if the exception is a RegionMovedException) * or delete it from the cache. Does nothing if we can be sure from the exception that @@ -1841,6 +1887,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.backoffPolicy; } + @Override + public EventExecutor getEventExecutor() { + return this.rpcClient.getEventExecutor(); + } + /* * Return the number of cached region for a table. It will only be called * from a unit test. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedFuture.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedFuture.java new file mode 100644 index 0000000..3c306c7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedFuture.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.CompleteFuture; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A Failed Response future + * @param Value type for Future + */ +@InterfaceAudience.Private +public final class FailedFuture extends CompleteFuture implements Future { + + private final Throwable cause; + + /** + * Creates a new instance. + * + * @param executor the {@link EventExecutor} associated with this future + * @param cause the cause of failure + */ + public FailedFuture(EventExecutor executor, Throwable cause) { + super(executor); + if (cause == null) { + throw new NullPointerException("cause"); + } + this.cause = cause; + } + + @Override + public Throwable cause() { + return cause; + } + + @Override + public boolean isSuccess() { + return false; + } + + @Override + public Future sync() { + PlatformDependent.throwException(cause); + return this; + } + + @Override + public Future syncUninterruptibly() { + PlatformDependent.throwException(cause); + return this; + } + + @Override + public V getNow() { + return null; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 54fbfe9..33467f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -435,15 +435,18 @@ public class HTable implements Table { } // Call that takes into account the replica - RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( - rpcControllerFactory, tableName, this.connection, get, pool, - connConfiguration.getRetriesNumber(), - operationTimeout, - connConfiguration.getPrimaryCallTimeoutMicroSecond()); - return callable.call(); + RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( + rpcCallerFactory, tableName, this.connection, get, AsyncTableImpl.GET_RESPONSE_CONVERTER, + operationTimeout, connConfiguration.getPrimaryCallTimeoutMicroSecond()); + try { + return callable.call().get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } } - /** * {@inheritDoc} */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/KeyedPromiseKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/KeyedPromiseKeeper.java new file mode 100644 index 0000000..5423fca --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/KeyedPromiseKeeper.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Keeps all promises which are added to this promise + * @param Type of result returned by promise + */ +@InterfaceAudience.Private +public class KeyedPromiseKeeper extends DefaultPromise> implements Future> { + + private final Map> promises; + private final Map responses; + + /** + * Constructor + * @param executor to run promise keeper on + */ + public KeyedPromiseKeeper(EventExecutor executor, Comparator comparator) { + super(executor); + + this.promises = new HashMap<>(); + this.responses = new TreeMap<>(comparator); + } + + /** + * Add a promise to the single promise listener + * @param promise to listen to + */ + public void addPromise(K key, Future promise){ + this.promises.put(key,promise); + promise.addListener(new KeyedResponseFuture(key)); + } + + /** + * Cancel all open promises + */ + private void cancelAllPromises() { + for(Future p : promises.values()){ + if(p != null) { + p.cancel(true); + } + } + } + + /** + * Indexed response promise listener + */ + private class KeyedResponseFuture implements ResponseFutureListener { + private final K key; + + /** + * Constructor + * @param i index for the response + */ + public KeyedResponseFuture(K i) { + this.key = i; + } + + @Override + public void operationComplete(Future future) { + if(!isDone()) { + // Set promise ref to be null to indicate it is fulfulled + promises.remove(this.key); + try { + responses.put(this.key,future.get()); + if(promises.isEmpty()){ + setSuccess(responses); + } + } catch (InterruptedException e) { + cancel(true); + cancelAllPromises(); + } catch (ExecutionException e) { + setFailure(e.getCause()); + cancelAllPromises(); + } + } + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PromiseKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PromiseKeeper.java new file mode 100644 index 0000000..18d10ea --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PromiseKeeper.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; + +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Keeps all promises which are added to this promise + * @param Type of result returned by promise + */ +@InterfaceAudience.Private +public class PromiseKeeper extends DefaultPromise implements Future { + + private int indexCounter = 0; + private int totalCount = 0; + private int doneCount = 0; + private final Future[] promises; + private final R[] responses; + + /** + * Constructor + * @param executor to run promise keeper on + * @param maxPromises max amount of promises to be able to listen to + * @param responses the empty responses array + */ + public PromiseKeeper(EventExecutor executor, int maxPromises, R[] responses) { + super(executor); + this.promises = new Future[maxPromises]; + this.responses = responses; + } + + /** + * Add a promise to the single promise listener + * @param promise to listen to + */ + public void addFuture(Future promise){ + totalCount++; + int i = indexCounter++; + this.promises[i] = promise; + promise.addListener(new IndexedResponseFuture(i)); + } + + /** + * Cancel all open promises + */ + private void cancelAllPromises() { + for(Future p : promises){ + if(p != null) { + p.cancel(true); + } + } + } + + /** + * Indexed response promise listener + */ + private class IndexedResponseFuture implements ResponseFutureListener { + private final int index; + + /** + * Constructor + * @param i index for the response + */ + public IndexedResponseFuture(int i) { + this.index = i; + } + + @Override + public void operationComplete(Future future) { + if(!isDone()) { + // Set promise ref to be null to indicate it is fulfulled + promises[index] = null; + try { + responses[index] = future.get(); + doneCount++; + if(doneCount == totalCount){ + setSuccess(responses); + } + } catch (InterruptedException e) { + cancel(true); + cancelAllPromises(); + } catch (ExecutionException e) { + setFailure(e.getCause()); + cancelAllPromises(); + } + } + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingPromise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingPromise.java new file mode 100644 index 0000000..686b97b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingPromise.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.EventExecutor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.Promise; + +/** + * Retrying response future + * @param Type of object expected in the Future + */ +@InterfaceAudience.Private +public class RetryingPromise extends Promise { + private final TryHandler tryHandler; + private int tries = 0; + private Future currentPromise; + + /** + * Constructor + * @param executor for the Future + * @param tryHandler Handling the call and failure + */ + public RetryingPromise(EventExecutor executor, TryHandler tryHandler) { + super(executor); + this.tryHandler = tryHandler; + } + + @Override + public boolean cancel(boolean mayInterupt){ + if (this.currentPromise != null && !currentPromise.isCancelled()){ + this.currentPromise.cancel(mayInterupt); + } + return super.cancel(mayInterupt); + } + + /** + * Do the actual call for the try. + */ + public void call() { + if (this.isCancelled()){ + return; + } + + this.currentPromise = this.tryHandler.call(tries); + + currentPromise.addListener(new ResponseFutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + currentPromise = null; + if (future.isSuccess()) { + setSuccess(future.getNow()); + } else if(future.isCancelled()){ + if(!isCancelled()){ + cancel(true); + } + } else { + try { + if (tryHandler.handleFail(tries, future.cause())) { + // Returned true so should try again + tries++; + call(); + } else { + // Returned false but with no exception so should return empty result + setSuccess(null); + } + } catch (Throwable e) { + setFailure(e); + } + } + } + }); + } + + /** + * Handles the try + * @param Type of response from the try + */ + public interface TryHandler { + /** + * Call method + * @param tries amount of tries + * @return Response future + */ + Future call(int tries); + + /** + * Handles fails + * @param tries Current try number + * @param e exception thrown + * @return true if should continue to try, false if it should stop + * @throws Throwable if failure is a fail that cant be recovered + */ + boolean handleFail(int tries, Throwable e) throws Throwable; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index b4cd2ef..b29fd16 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import java.io.IOException; -/** - * - */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface RpcRetryingCaller { @@ -52,4 +49,21 @@ public interface RpcRetryingCaller { */ T callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException; + + /** + * Call the method Async with retries + * @param callable the async RegionServer callable + * @param operationTimeout timeout for the operation + * @return Future with the result + */ + Future callAsyncWithRetries(AsyncRetryingCallable callable, int operationTimeout); + + /** + * Call the server once only. + * {@link RetryingCallable} has a strange shape so we can do retries. Use this invocation if you + * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely + * succeed). + * @return an object of type T + */ + Future callAsyncWithoutRetries(AsyncRetryingCallable callable, int callTimeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 8f28796..88cbc92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +18,8 @@ package org.apache.hadoop.hbase.client; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; @@ -36,8 +37,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; -import com.google.protobuf.ServiceException; - /** * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client * threadlocal outstanding timeouts as so we don't persist too much. @@ -89,8 +88,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { @Override public T callWithRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { - List exceptions = - new ArrayList(); + List exceptions = new ArrayList<>(); tracker.start(); context.clear(); for (int tries = 0;; tries++) { @@ -140,11 +138,15 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { try { if (expectedSleep > 0) { synchronized (cancelled) { - if (cancelled.get()) return null; + if (cancelled.get()) { + return null; + } cancelled.wait(expectedSleep); } } - if (cancelled.get()) return null; + if (cancelled.get()) { + return null; + } } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted after " + tries + " tries while maxAttempts=" + maxAttempts); @@ -152,6 +154,101 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } } + @Override + public Future callAsyncWithRetries(final AsyncRetryingCallable callable, + final int callTimeout) { + tracker.start(); + context.clear(); + + RetryingPromise future = new RetryingPromise<>( + callable.getEventExecutor(), + new RetryingPromise.TryHandler() { + List exceptions = new ArrayList<>(); + long expectedSleep; + + @Override + public Future call(int tries) { + try { + callable.prepare(tries != 0); // if called with false, check table status on ZK + interceptor.intercept(context.prepare(callable, tries)); + + return callable.call(getTimeout(callTimeout)); + } catch (IOException e) { + return new FailedFuture<>(callable.getEventExecutor(),e); + } + } + + @Override + public boolean handleFail(int tries, Throwable e) throws Throwable { + try { + if (e instanceof PreemptiveFastFailException) { + throw e; + } + + ExceptionUtil.rethrowIfInterrupt(e); + if (tries > startLogErrorsCnt) { + LOG.info("Call exception, tries=" + tries + ", retries=" + maxAttempts + + ", started=" + + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms " + + "ago, cancelled=" + cancelled.get() + ", msg=" + + callable.getExceptionMessageAdditionalDetail()); + } + + // translateException throws exception when should not retry: i.e. when request is + // bad + interceptor.handleFailure(context, e); + e = translateException(e); + callable.throwable(e, maxAttempts != 1); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(e, + EnvironmentEdgeManager.currentTime(), toString()); + exceptions.add(qt); + if (tries >= maxAttempts - 1) { + throw new RetriesExhaustedException(tries, exceptions); + } + // If the server is dead, we need to wait a little before retrying, to give + // a chance to the regions to be + // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + expectedSleep = callable.sleep(pause, tries + 1); + + // If, after the planned sleep, there won't be enough time left, we stop now. + long duration = singleCallDuration(expectedSleep); + if (duration > callTimeout) { + String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + + ": " + callable.getExceptionMessageAdditionalDetail(); + throw (new SocketTimeoutException(msg).initCause(e)); + } + } finally { + interceptor.updateFailureInfo(context); + } + + try { + if (expectedSleep > 0) { + synchronized (cancelled) { + if (cancelled.get()) { + return false; + } + cancelled.wait(expectedSleep); + } + } + if (cancelled.get()){ + return false; + } + } catch (InterruptedException e1) { + throw new InterruptedIOException( + "Interrupted after " + tries + " tries while maxAttempts=" + maxAttempts); + } + + return true; + } + } + ); + + future.call(); + + return future; + } + /** * @return Calculate how long a single call took */ @@ -177,7 +274,20 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } } } - + + @Override + public Future callAsyncWithoutRetries(AsyncRetryingCallable callable, + int callTimeout) { + // The code of this method should be shared with withRetries. + this.tracker.start(); + try { + callable.prepare(false); + return callable.call(callTimeout); + } catch (IOException e) { + return new FailedFuture<>(callable.getEventExecutor(),e); + } + } + /** * Get the good or the remote exception if any, throws the DoNotRetryIOException. * @param t the throwable to analyze diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 425d314..33c5bbf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -17,17 +17,13 @@ */ package org.apache.hadoop.hbase.client; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; import java.util.List; -import java.util.concurrent.CancellationException; +import java.util.TimerTask; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,55 +31,48 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.ipc.MessageConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.protobuf.ServiceException; - - /** * Caller that goes to replica if the primary region does no answer within a configurable * timeout. If the timeout is reached, it calls all the secondary replicas, and returns * the first answer. If the answer comes from one of the secondary replica, it will * be marked as stale. + * @param Result type which is returned */ @InterfaceAudience.Private -public class RpcRetryingCallerWithReadReplicas { +public class RpcRetryingCallerWithReadReplicas { private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); - protected final ExecutorService pool; protected final ClusterConnection cConnection; protected final Configuration conf; protected final Get get; + private final MessageConverter converter; protected final TableName tableName; protected final int timeBeforeReplicas; private final int callTimeout; - private final int retries; - private final RpcControllerFactory rpcControllerFactory; private final RpcRetryingCallerFactory rpcRetryingCallerFactory; + private final int priority; public RpcRetryingCallerWithReadReplicas( - RpcControllerFactory rpcControllerFactory, TableName tableName, + RpcRetryingCallerFactory rpcRetryingCallerFactory, TableName tableName, ClusterConnection cConnection, final Get get, - ExecutorService pool, int retries, int callTimeout, + MessageConverter converter, int callTimeout, int timeBeforeReplicas) { - this.rpcControllerFactory = rpcControllerFactory; this.tableName = tableName; this.cConnection = cConnection; this.conf = cConnection.getConfiguration(); this.get = get; - this.pool = pool; - this.retries = retries; + this.converter = converter; this.callTimeout = callTimeout; this.timeBeforeReplicas = timeBeforeReplicas; - this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); + this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; + this.priority = AsyncTableImpl.getPriority(tableName); } /** @@ -92,22 +81,25 @@ public class RpcRetryingCallerWithReadReplicas { * - we need to stop retrying when the call is completed * - we can be interrupted */ - class ReplicaRegionServerCallable extends RegionServerCallable implements Cancellable { + class ReplicaRegionServerCallable extends AsyncRegionServerCallable implements + Cancellable { final int id; - private final PayloadCarryingRpcController controller; + private boolean isCanceled; + private Future future; + private final MessageConverter converter; - public ReplicaRegionServerCallable(int id, HRegionLocation location) { + public ReplicaRegionServerCallable(int id, HRegionLocation location, + MessageConverter converter) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); this.id = id; this.location = location; - this.controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); + this.converter = converter; } @Override public void cancel() { - controller.startCancel(); + isCanceled = true; } /** @@ -117,10 +109,8 @@ public class RpcRetryingCallerWithReadReplicas { */ @Override public void prepare(final boolean reload) throws IOException { - if (controller.isCanceled()) return; - - if (Thread.interrupted()) { - throw new InterruptedIOException(); + if (isCanceled) { + return; } if (reload || location == null) { @@ -134,39 +124,35 @@ public class RpcRetryingCallerWithReadReplicas { throw new HBaseIOException("There is no location for replica id #" + id); } - ServerName dest = location.getServerName(); - - setStub(cConnection.getClient(dest)); + setClientByServiceName(location.getServerName()); } @Override - public Result call(int callTimeout) throws Exception { - if (controller.isCanceled()) return null; - - if (Thread.interrupted()) { - throw new InterruptedIOException(); + public Future call(int callTimeout) { + if (isCanceled) { + return null; } byte[] reg = location.getRegionInfo().getRegionName(); - ClientProtos.GetRequest request = - RequestConverter.buildGetRequest(reg, get); - controller.setCallTimeout(callTimeout); - + ClientProtos.GetRequest request; try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) { - return null; - } - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + request = RequestConverter.buildGetRequest(reg, get); + } catch (IOException e) { + return new FailedFuture<>(getConnection().getEventExecutor(),e); } + + this.future = getChannel().callMethod( + AsyncTableImpl.GET_DESCRIPTOR, request, null, + ClientProtos.GetResponse.getDefaultInstance(), converter, null, callTimeout, priority + ); + return this.future; } @Override public boolean isCancelled() { - return controller.isCanceled(); + return (future != null && future.isCancelled()) + || isCanceled; } } @@ -187,56 +173,40 @@ public class RpcRetryingCallerWithReadReplicas { *

* Globally, the number of retries, timeout and so on still applies, but it's per replica, * not global. We continue until all retries are done, or all timeouts are exceeded. + * @return Future with the result */ - public synchronized Result call() - throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { + public Future call() { boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); - RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() - : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); - ResultBoundedCompletionService cs = - new ResultBoundedCompletionService(this.rpcRetryingCallerFactory, pool, rl.size()); + try { + final RegionLocations rl = getRegionLocations(true, + (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), + cConnection, tableName, get.getRow()); - if(isTargetReplicaSpecified) { - addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); - } else { - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response - } - } catch (ExecutionException e) { - throwEnrichedException(e, retries); - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - // submit call for the all of the secondaries at once - addCallsForReplica(cs, rl, 1, rl.size() - 1); - } + final SinglePromiseKeeper pk = new SinglePromiseKeeper<>( + this.cConnection.getEventExecutor(), rl.size()); - try { - try { - Future f = cs.take(); - return f.get(); - } catch (ExecutionException e) { - throwEnrichedException(e, retries); + if(isTargetReplicaSpecified) { + addCallsForReplica(pk, rl, get.getReplicaId(), get.getReplicaId()); + } else { + addCallsForReplica(pk, rl, 0, 0); + + pk.addScheduledTask( + TimeUnit.MILLISECONDS.convert(timeBeforeReplicas, TimeUnit.MICROSECONDS), + new TimerTask() { + @Override + public void run() { + // submit call for the all of the secondaries at once + addCallsForReplica(pk, rl, 1, rl.size() - 1); + } + } + ); } - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } finally { - // We get there because we were interrupted or because one or more of the - // calls succeeded or failed. In all case, we stop all our tasks. - cs.cancelAll(); + return pk; + } catch (RetriesExhaustedException | DoNotRetryIOException | InterruptedIOException e) { + return new FailedFuture<>(cConnection.getEventExecutor(),e); } - - return null; // unreachable } /** @@ -269,17 +239,22 @@ public class RpcRetryingCallerWithReadReplicas { /** * Creates the calls and submit them * - * @param cs - the completion service to use for submitting + * @param promiseKeeper - the completion service to use for submitting * @param rl - the region locations * @param min - the id of the first replica, inclusive * @param max - the id of the last replica, inclusive. */ - private void addCallsForReplica(ResultBoundedCompletionService cs, + private void addCallsForReplica(SinglePromiseKeeper promiseKeeper, RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); - ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, callTimeout, id); + ReplicaRegionServerCallable callable = new ReplicaRegionServerCallable(id, hrl, + converter); + + Future promise = + rpcRetryingCallerFactory.newCaller().callAsyncWithRetries(callable, callTimeout); + promiseKeeper.addPromise(promise); + } } @@ -292,7 +267,7 @@ public class RpcRetryingCallerWithReadReplicas { if (!useCache) { rl = cConnection.relocateRegion(tableName, row, replicaId); } else { - rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId); + rl = cConnection.locateRegion(tableName, row, true, true, replicaId); } } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { throw e; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SinglePromiseKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SinglePromiseKeeper.java new file mode 100644 index 0000000..6d73e878 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SinglePromiseKeeper.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Keeps a Single promise + * @param Type of result returned by promise + */ +@InterfaceAudience.Private +public class SinglePromiseKeeper extends DefaultPromise + implements Future { + + private final List promises; + private final List exceptions; + + private Timer timer; + + private final ResponseFutureListener listener = new ResponseFutureListener() { + @Override + public void operationComplete(Future future) { + if(!isDone()) { + promises.remove(future); + + try { + setSuccess(future.get()); + } catch (InterruptedException e) { + cancel(true); + } catch (ExecutionException e) { + if(promises.isEmpty()) { + setFailure(exceptions.get(0)); + }else{ + exceptions.add(e); + } + } + } + + if(timer != null){ + timer.cancel(); + timer = null; + } + + for(Future promise : promises){ + if(!promise.isDone()) { + promise.cancel(true); + } + } + promises.clear(); + } + }; + + /** + * Constructor + * @param executor to run promise keeper on + * @param maxPromises max amount of promises to be able to listen to + */ + public SinglePromiseKeeper(EventExecutor executor, int maxPromises) { + super(executor); + this.promises = new ArrayList<>(maxPromises); + this.exceptions = new ArrayList<>(maxPromises); + } + + /** + * Add a promise to the single promise listener + * @param promise to listen to + */ + public void addPromise(Future promise){ + this.promises.add(promise); + promise.addListener(listener); + } + + /** + * Add a scheduled task to apply when a certain timeout has been reached + * @param timeToScheduleInMs time in ms to schedule additive task + * @param timerTask task to time. + */ + public void addScheduledTask(long timeToScheduleInMs, TimerTask timerTask) { + if(this.timer == null) { + this.timer = new Timer(true); + this.timer.schedule(timerTask,timeToScheduleInMs); + }else{ + throw new RuntimeException("There is already a scheduled task added"); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SuccessfulFuture.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SuccessfulFuture.java new file mode 100644 index 0000000..42c3ede --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SuccessfulFuture.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.CompleteFuture; +import io.netty.util.concurrent.EventExecutor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A Failed Response future + * @param Value type for Future + */ +@InterfaceAudience.Private +public final class SuccessfulFuture extends CompleteFuture implements Future { + + private final V result; + + /** + * Creates a new instance. + * + * @param executor the {@link EventExecutor} associated with this future + * @param result of promise + */ + public SuccessfulFuture(EventExecutor executor, V result) { + super(executor); + this.result = result; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean isSuccess() { + return true; + } + + @Override + public Future sync() { + return this; + } + + @Override + public Future syncUninterruptibly() { + return this; + } + + @Override + public V getNow() { + return result; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableImpl.java new file mode 100644 index 0000000..56d53ac --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableImpl.java @@ -0,0 +1,612 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncTableImpl.getKeysAndRegionsInRange; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * An implementation of {@link Table}. Used to communicate with a single HBase table. + * Lightweight. Get as needed and just close when done. + * Instances of this class SHOULD NOT be constructed directly. + * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} + * class comment for an example of how. + * + *

TableImpl is not a client API. Use {@link Table} instead. It is marked + * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in + * Hadoop + * Interface Classification + * There are no guarantees for backwards source / binary compatibility and methods or class can + * change or go away without deprecation. + * + * @see Table + * @see Admin + * @see Connection + * @see ConnectionFactory + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class TableImpl implements Table { + private static final Log LOG = LogFactory.getLog(TableImpl.class); + + private final ClusterConnection connection; + private final AsyncTable asyncTable; + private final ConnectionConfiguration connConfiguration; + private final RpcRetryingCallerFactory rpcCallerFactory; + private final int scannerCaching; + private final long scannerMaxResultSize; + private final RpcControllerFactory rpcControllerFactory; + private final ExecutorService pool; + + private boolean closed = false; + private long writeBuffer; + + private BufferedMutatorImpl mutator; + + private int operationTimeout; // global timeout for each blocking method with retrying rpc + private int rpcTimeout; // timeout for each rpc request + + /** + * Creates an object to access a HBase table. + * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to + * get a {@link Table} instance (use {@link Table} instead of {@link TableImpl}). + * @param tableName Name of the table. + * @param connection HConnection to be used. + * @param tableConfig configuration for the Table + * @param rpcCallerFactory The factory which creates the RPC callers + * @param pool to use for batches and scans + * @throws IOException if a remote or network exception occurs + */ + @InterfaceAudience.Private + protected TableImpl(TableName tableName, final ClusterConnection connection, + ConnectionConfiguration tableConfig, + RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory controllerFactory, ExecutorService pool) + throws IOException { + if (connection == null || connection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + + this.connection = connection; + + if (tableConfig == null) { + connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); + } else { + this.connConfiguration = tableConfig; + } + + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.rpcTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + + this.rpcCallerFactory = rpcCallerFactory; + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + this.rpcControllerFactory = + (controllerFactory != null)? + controllerFactory : + RpcControllerFactory.instantiate(connection.getConfiguration()); + this.pool = pool; + + this.asyncTable = new AsyncTableImpl(tableName, connection, connConfiguration, + rpcCallerFactory); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getConfiguration() { + return asyncTable.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + try(Admin admin = connection.getAdmin()){ + HTableDescriptor htd = admin.getTableDescriptor(asyncTable.getName()); + if (htd != null) { + return new UnmodifyableHTableDescriptor(htd); + } + return null; + } + } + + @Override + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + + @Override + public int getOperationTimeout() { + return operationTimeout; + } + + @Override + public int getRpcTimeout() { + return rpcTimeout; + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + + @Override + public boolean exists(Get get) throws IOException { + try { + get.setCheckExistenceOnly(true); + + Result r = asyncTable.get(get).get(); + return r.getExists(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public boolean[] existsAll(List gets) throws IOException { + try { + for (Get get : gets) { + get.setCheckExistenceOnly(true); + } + + Result[] rs = asyncTable.get(gets).get(); + + boolean[] exists = new boolean[rs.length]; + for (int i = 0; i < rs.length; i++) { + exists[i] = rs[i].getExists(); + } + + return exists; + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + connection.processBatch(actions, getName(), connection.getEventExecutor(), results); + } + + @Override + public void batchCallback(List actions, Object[] results, Callback callback) + throws IOException, InterruptedException { + connection.processBatchCallback(actions, getName(), connection.getEventExecutor(), results, + callback); + } + + @Override + public Result get(Get get) throws IOException { + try { + return asyncTable.get(get).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public Result[] get(List gets) throws IOException { + try { + return asyncTable.get(gets).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + if (scan.getBatch() > 0 && scan.isSmall()) { + throw new IllegalArgumentException("Small scan should not be used with batching"); + } + + if (scan.getCaching() <= 0) { + scan.setCaching(scannerCaching); + } + if (scan.getMaxResultSize() <= 0) { + scan.setMaxResultSize(scannerMaxResultSize); + } + + Boolean async = scan.isAsyncPrefetch(); + if (async == null) { + async = connConfiguration.isClientScannerAsyncPrefetch(); + } + + if (scan.isReversed()) { + if (scan.isSmall()) { + return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), + this.connection, this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } else { + return new ReversedClientScanner(getConfiguration(), scan, getName(), + this.connection, this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } + } + + if (scan.isSmall()) { + return new ClientSmallScanner(getConfiguration(), scan, getName(), + this.connection, this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } else { + if (async) { + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } else { + return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } + } + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return this.getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return this.getScanner(scan); + } + + @Override + public void put(Put put) throws IOException { + try { + this.asyncTable.mutate(put).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public void put(List puts) throws IOException { + getBufferedMutator().mutate(puts); + getBufferedMutator().flush(); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) + throws IOException { + return this.checkAndPut(row,family,qualifier,CompareOp.EQUAL,value,put); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, Put put) throws IOException { + try { + return asyncTable.checkAndMutate(family, qualifier, compareOp, value, put).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public void delete(Delete delete) throws IOException { + try { + asyncTable.mutate(delete).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public void delete(List deletes) throws IOException { + getBufferedMutator().mutate(deletes); + getBufferedMutator().flush(); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, + Delete delete) throws IOException { + return this.checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, Delete delete) throws IOException { + try { + return asyncTable.checkAndMutate(family, qualifier, compareOp, value, delete).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + try { + asyncTable.mutate(rm).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public Result append(Append append) throws IOException { + try { + return asyncTable.mutate(append).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public Result increment(Increment increment) throws IOException { + try { + return asyncTable.mutate(increment).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) + throws IOException { + return this.incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, + Durability durability) throws IOException { + Increment increment = new Increment(row); + increment.addColumn(family, qualifier, amount); + increment.setDurability(durability); + + try { + Cell cell = asyncTable.mutate(increment).get().getColumnLatestCell(family, qualifier); + return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(), cell.getValueLength()); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException { + try { + return asyncTable.checkAndMutate(family, qualifier, compareOp, value, mutation).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw (IOException) e.getCause(); + } + } + + @Override + public TableName getName() { + return asyncTable.getName(); + } + + @Override + public void close() throws IOException { + if (this.closed) { + return; + } + if(mutator != null){ + mutator.close(); + } + asyncTable.close(); + this.closed = true; + } + + /** + * {@inheritDoc} + */ + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return new RegionCoprocessorRpcChannel(connection, asyncTable.getName(), row); + } + + /** + * {@inheritDoc} + */ + @Override + public Map coprocessorService(final Class service, + byte[] startKey, byte[] endKey, final Batch.Call callable) + throws ServiceException, Throwable { + final Map results = Collections + .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); + coprocessorService(service, startKey, endKey, callable, new Batch.Callback() { + @Override + public void update(byte[] region, byte[] row, R value) { + if (region != null) { + results.put(region, value); + } + } + }); + return results; + } + + /** + * {@inheritDoc} + */ + @Override + public void coprocessorService(final Class service, + byte[] startKey, byte[] endKey, final Batch.Call callable, + final Callback callback) throws ServiceException, Throwable { + // get regions covered by the row range + List keys = getStartKeysInRange(startKey, endKey); + + Map> futures = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (final AsyncTableImpl.KeyAndRegion r : keys) { + final RegionCoprocessorRpcChannel channel = + new RegionCoprocessorRpcChannel(connection, asyncTable.getName(), r.getKey()); + + Future future = connection.getEventExecutor().submit(new Callable() { + @Override + public R call() throws Exception { + T instance = ProtobufUtil.newServiceStub(service, channel); + R result = callable.call(instance); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r.getKey(), result); + } + return result; + } + } + ); + futures.put(r.getKey(), future); + } + for (Map.Entry> e : futures.entrySet()) { + try { + e.getValue().get(); + } catch (ExecutionException ee) { + LOG.warn("Error calling coprocessor service " + service.getName() + " for row " + + Bytes.toStringBinary(e.getKey()), ee); + throw ee.getCause(); + } catch (InterruptedException ie) { + throw new InterruptedIOException("Interrupted calling coprocessor service " + + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); + } + } + } + + private List getStartKeysInRange(byte[] start, byte[] end) + throws IOException { + if (start == null) { + start = HConstants.EMPTY_START_ROW; + } + if (end == null) { + end = HConstants.EMPTY_END_ROW; + } + return getKeysAndRegionsInRange(connection, asyncTable.getName(), start, end, true, false); + } + + @Override + public long getWriteBufferSize() { + return writeBuffer; + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + this.writeBuffer = writeBufferSize; + } + + @Override + public Map batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype) throws ServiceException, InterruptedException { + try { + return asyncTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, + responsePrototype).get(); + } catch (ExecutionException e) { + throw new ServiceException(e.getCause()); + } + } + + @Override + public void batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype, Callback callback) throws ServiceException, Throwable { + try { + asyncTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, + responsePrototype, callback).get(); + } catch (ExecutionException e) { + throw new ServiceException(e.getCause()); + } + } + + @Override + public String toString() { + return asyncTable.getName() + ";" + connection; + } + + BufferedMutator getBufferedMutator() throws IOException { + if (mutator == null) { + this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( + new BufferedMutatorParams(asyncTable.getName()).pool(connection.getEventExecutor()) + .writeBufferSize(connConfiguration.getWriteBufferSize()) + .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())); + } + return mutator; + } + + public ExecutorService getPool() { + return pool; + } + + public ClusterConnection getConnection() { + return connection; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/VoidPromiseKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/VoidPromiseKeeper.java new file mode 100644 index 0000000..410bd43 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/VoidPromiseKeeper.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; + +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Keeps all promises which are added to this promise but returns null as response when all are done + */ +@InterfaceAudience.Private +public class VoidPromiseKeeper extends DefaultPromise implements Future { + + private int indexCounter = 0; + private int totalCount = 0; + private int doneCount = 0; + private final Future[] promises; + + /** + * Constructor + * @param executor to run promise keeper on + * @param maxPromises max amount of promises to be able to listen to + */ + public VoidPromiseKeeper(EventExecutor executor, int maxPromises) { + super(executor); + this.promises = new Future[maxPromises]; + } + + /** + * Add a promise to the single promise listener + * @param promise to listen to + */ + public void addFuture(Future promise){ + totalCount++; + int i = indexCounter++; + this.promises[i] = promise; + promise.addListener(new IndexedResponseFuture(i)); + } + + /** + * Cancel all open promises + */ + private void cancelAllPromises() { + for(Future p : promises){ + if(p != null) { + p.cancel(true); + } + } + } + + /** + * Indexed response promise listener + */ + private class IndexedResponseFuture implements ResponseFutureListener { + private final int index; + + /** + * Constructor + * @param i index for the response + */ + public IndexedResponseFuture(int i) { + this.index = i; + } + + @Override + public void operationComplete(Future future) { + if(!isDone()) { + // Set promise ref to be null to indicate it is fulfilled + promises[index] = null; + try { + future.get(); + doneCount++; + if(doneCount == totalCount){ + setSuccess(null); + } + } catch (InterruptedException e) { + cancel(true); + cancelAllPromises(); + } catch (ExecutionException e) { + setFailure(e.getCause()); + cancelAllPromises(); + } + } + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java index a9c23cc..e5efb38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Future; /** * A collection of interfaces and utilities used for interacting with custom RPC @@ -56,6 +57,30 @@ public abstract class Batch { } /** + * Defines a unit of work to be executed. + * + *

+ * When used with + * {@link org.apache.hadoop.hbase.client.AsyncTable#coprocessorService(Class, byte[], byte[], + * Batch.AsyncCall)} + * the implementations {@link Batch.AsyncCall#call(Object)} method will be invoked + * with a proxy to each region's coprocessor {@link com.google.protobuf.Service} implementation. + *

+ * @see org.apache.hadoop.hbase.client.coprocessor + * @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[]) + * @see org.apache.hadoop.hbase.client.AsyncTable#coprocessorService(Class, byte[], byte[], + * Batch.AsyncCall) + * @param the instance type to be passed to + * {@link Batch.AsyncCall#call(Object)} + * @param the return type from {@link Batch.AsyncCall#call(Object)} + */ + @InterfaceAudience.Public + @InterfaceStability.Stable + public interface AsyncCall { + Future call(T instance); + } + + /** * Defines a generic callback to be triggered for each {@link Batch.Call#call(Object)} * result. * diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index f083001..ae475f3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -533,7 +533,7 @@ public class TestClientScanner { } } - public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { + public class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { public MockRpcRetryingCallerFactory(Configuration conf) { super(conf); @@ -562,6 +562,19 @@ public class TestClientScanner { throw new RuntimeException(e); } } + + @Override + public Future callAsyncWithRetries(AsyncRetryingCallable callable, + int operationTimeout) { + return new FailedFuture<>(clusterConn.getEventExecutor().next(), + new IOException("Scanner exception")); + } + + @Override + public Future callAsyncWithoutRetries(AsyncRetryingCallable callable, + int callTimeout) { + return callable.call(callTimeout); + } }; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 7b35815..2621845 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; @@ -1316,7 +1315,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table. * @param tableName * @param family - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, String family) @@ -1328,7 +1327,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table. * @param tableName * @param families - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, String[] families) @@ -1344,7 +1343,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table. * @param tableName * @param family - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[] family) @@ -1357,7 +1356,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param family * @param numRegions - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) @@ -1374,7 +1373,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table. * @param tableName * @param families - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families) @@ -1386,7 +1385,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table with multiple regions. * @param tableName * @param families - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { @@ -1398,7 +1397,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param families * @param splitKeys - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) @@ -1427,7 +1426,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param htd * @param families * @param c Configuration to use - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(HTableDescriptor htd, byte[][] families, Configuration c) @@ -1441,7 +1440,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param families * @param splitKeys * @param c Configuration to use - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(HTableDescriptor htd, byte[][] families, byte[][] splitKeys, @@ -1465,7 +1464,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table. * @param htd * @param splitRows - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(HTableDescriptor htd, byte[][] splitRows) @@ -1483,7 +1482,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param families * @param splitKeys * @param c Configuration to use - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, @@ -1496,7 +1495,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param family * @param numVersions - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[] family, int numVersions) @@ -1509,7 +1508,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param families * @param numVersions - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, int numVersions) @@ -1523,7 +1522,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param families * @param numVersions * @param splitKeys - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, int numVersions, @@ -1545,7 +1544,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param families * @param numVersions - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) @@ -1559,7 +1558,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param families * @param numVersions * @param blockSize - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, @@ -1602,7 +1601,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param families * @param numVersions - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, @@ -1628,7 +1627,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param tableName * @param family * @param splitRows - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) @@ -1647,7 +1646,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Create a table with multiple regions. * @param tableName * @param family - * @return An HTable instance for the created table. + * @return A Table instance for the created table. * @throws IOException */ public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { @@ -1887,7 +1886,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Provide an existing table name to truncate. * Scans the table and issues a delete for each row read. * @param tableName existing table - * @return HTable to that new table + * @return Table to that new table * @throws IOException */ public Table deleteTableData(TableName tableName) throws IOException { @@ -1908,7 +1907,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Effectively disables, deletes, and recreates the table. * @param tableName table which must exist. * @param preserveRegions keep the existing split points - * @return HTable for the new table + * @return Table for the new table */ public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws IOException { @@ -1927,7 +1926,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * deleteTableData. * Expressly does not preserve regions of existing table. * @param tableName table which must exist. - * @return HTable for the new table + * @return Table for the new table */ public Table truncateTable(final TableName tableName) throws IOException { return truncateTable(tableName, false); @@ -2003,7 +2002,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** A tracker for tracking and validating table rows - * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])} + * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])} */ public static class SeenRowTracker { int dim = 'z' - 'a' + 1; @@ -2239,7 +2238,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return digest.toString(); } - /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */ + /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB static { int i = 0; @@ -2640,7 +2639,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105 * * @param nodeZK - the ZK watcher to expire - * @param checkStatus - true to check if we can create an HTable with the + * @param checkStatus - true to check if we can create an Table with the * current configuration. */ public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index ca4b609..e5bc9bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -120,10 +120,10 @@ public class TestFromClientSide { // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide. private static final Log LOG = LogFactory.getLog(TestFromClientSide.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte [] ROW = Bytes.toBytes("testRow"); - private static byte [] FAMILY = Bytes.toBytes("testFamily"); - private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte [] VALUE = Bytes.toBytes("testValue"); + private static final byte [] ROW = Bytes.toBytes("testRow"); + private static final byte [] FAMILY = Bytes.toBytes("testFamily"); + private static final byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte [] VALUE = Bytes.toBytes("testValue"); protected static int SLAVES = 3; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java new file mode 100644 index 0000000..82cfeec --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java @@ -0,0 +1,227 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.filter.LongComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +/** + * Run tests that use the HBase clients; {@link Table}. + * Sets up the HBase mini cluster once at start and runs through all client tests. + * Each creates a table named for the method and does its stuff against that. + */ +@Category({LargeTests.class, ClientTests.class}) +@SuppressWarnings ("deprecation") +public class TestFromClientSideAsync { + // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide. + private static final Log LOG = LogFactory.getLog(TestFromClientSideAsync.class); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte [] ROW = Bytes.toBytes("testRow"); + private static final byte [] FAMILY = Bytes.toBytes("testFamily"); + private static final byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte [] VALUE = Bytes.toBytes("testValue"); + protected static int SLAVES = 3; + + /** + * @throws Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Uncomment the following lines if more verbosity is needed for + // debugging (see HBASE-12285 for details). + //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); + //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); + //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + private AsyncTable createAndGetAsyncTable(TableName tableName) throws IOException { + TEST_UTIL.createTable(tableName, FAMILY); + return TEST_UTIL.getConnection().getAsyncTable(tableName); + } + + @Test + public void testGet_EmptyTable() throws IOException, ExecutionException, InterruptedException { + AsyncTable table = createAndGetAsyncTable(TableName.valueOf("testGet_EmptyTable")); + + Get get = new Get(ROW); + get.addFamily(FAMILY); + Result r = table.get(get).get(); + assertTrue(r.isEmpty()); + } + + @Test + public void testGet_NullQualifier() throws IOException, ExecutionException, InterruptedException { + AsyncTable table = createAndGetAsyncTable(TableName.valueOf("testGet_NullQualifier")); + + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + table.mutate(put).get(); + + put = new Put(ROW); + put.addColumn(FAMILY, null, VALUE); + table.mutate(put).get(); + LOG.info("Row put"); + + Get get = new Get(ROW); + get.addColumn(FAMILY, null); + Result r = table.get(get).get(); + assertEquals(1, r.size()); + + get = new Get(ROW); + get.addFamily(FAMILY); + r = table.get(get).get(); + assertEquals(2, r.size()); + } + + @Test + public void testGet_NonExistentRow() + throws IOException, ExecutionException, InterruptedException { + AsyncTable table = createAndGetAsyncTable(TableName.valueOf("testGet_NonExistentRow")); + + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + table.mutate(put).getNow(); + LOG.info("Row put"); + + Get get = new Get(ROW); + get.addFamily(FAMILY); + Result r = table.get(get).get(); + assertFalse(r.isEmpty()); + LOG.info("Row retrieved successfully"); + + byte [] missingrow = Bytes.toBytes("missingrow"); + get = new Get(missingrow); + get.addFamily(FAMILY); + r = table.get(get).get(); + assertTrue(r.isEmpty()); + LOG.info("Row missing as it should be"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 4723fa8..d4bf9ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -180,8 +180,8 @@ public class TestHCM { ExecutorService pool = null; - if(table instanceof HTable) { - HTable t = (HTable) table; + if(table instanceof TableImpl) { + TableImpl t = (TableImpl) table; // make sure passing a pool to the getTable does not trigger creation of an internal pool assertNull("Internal Thread pool should be null", ((ConnectionImplementation) con1).getCurrentBatchPool()); @@ -189,22 +189,22 @@ public class TestHCM { assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable) con2.getTable(tableName); + t = (TableImpl) con2.getTable(tableName); // table should use the connectin's internal pool assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable) con2.getTable(tableName); + t = (TableImpl) con2.getTable(tableName); // try other API too assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable) con2.getTable(tableName); + t = (TableImpl) con2.getTable(tableName); // try other API too assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable) con1.getTable(tableName); + t = (TableImpl) con1.getTable(tableName); pool = ((ConnectionImplementation) con1).getCurrentBatchPool(); // make sure an internal pool was created assertNotNull("An internal Thread pool should have been created", pool); @@ -212,7 +212,7 @@ public class TestHCM { assertTrue(t.getPool() == pool); t.close(); - t = (HTable) con1.getTable(tableName); + t = (TableImpl) con1.getTable(tableName); // still using the *same* internal pool assertTrue(t.getPool() == pool); t.close(); @@ -888,17 +888,17 @@ public class TestHCM { Table table = conn.getTable(TABLE_NAME1); table.close(); assertFalse(conn.isClosed()); - if(table instanceof HTable) { - assertFalse(((HTable) table).getPool().isShutdown()); + if(table instanceof TableImpl) { + assertFalse(((TableImpl) table).getPool().isShutdown()); } table = conn.getTable(TABLE_NAME1); table.close(); - if(table instanceof HTable) { - assertFalse(((HTable) table).getPool().isShutdown()); + if(table instanceof TableImpl) { + assertFalse(((TableImpl) table).getPool().isShutdown()); } conn.close(); - if(table instanceof HTable) { - assertTrue(((HTable) table).getPool().isShutdown()); + if(table instanceof TableImpl) { + assertTrue(((TableImpl) table).getPool().isShutdown()); } table0.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java index c93794d..7654cbc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java @@ -67,8 +67,8 @@ public class TestShortCircuitConnection { HRegionServer regionServer = UTIL.getRSForFirstRegionInTable(tn); ClusterConnection connection = regionServer.getClusterConnection(); Table tableIf = connection.getTable(tn); - assertTrue(tableIf instanceof HTable); - HTable table = (HTable) tableIf; + assertTrue(tableIf instanceof TableImpl); + TableImpl table = (TableImpl) tableIf; assertTrue(table.getConnection() == connection); AdminService.BlockingInterface admin = connection.getAdmin(regionServer.getServerName()); ClientService.BlockingInterface client = connection.getClient(regionServer.getServerName()); -- 2.5.0