From 3270f0dfa3b8c7e79c73643e0383d04b1ceac78e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 11 Oct 2016 20:43:48 +0800 Subject: [PATCH] HBASE-15921 Add first AsyncTable impl and create TableImpl based on it --- hbase-client/pom.xml | 12 ++ .../hadoop/hbase/client/AsyncConnection.java | 68 ++++++ .../hbase/client/AsyncConnectionConfiguration.java | 59 ++++++ .../hadoop/hbase/client/AsyncConnectionImpl.java | 142 +++++++++++++ .../hadoop/hbase/client/AsyncRegionLocator.java | 64 ++++++ .../hbase/client/AsyncRpcRetryCallerFactory.java | 126 +++++++++++ .../client/AsyncSingleActionRpcRetryCaller.java | 230 +++++++++++++++++++++ .../org/apache/hadoop/hbase/client/AsyncTable.java | 126 +++++++++++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 192 +++++++++++++++++ .../hbase/client/AsyncTableRegionLocator.java | 60 ++++++ .../hbase/client/AsyncTableRegionLocatorImpl.java | 50 +++++ .../hadoop/hbase/client/ClusterRegistry.java | 41 ++++ .../hbase/client/ClusterRegistryFactory.java | 43 ++++ .../hbase/client/ConnectionImplementation.java | 53 ++--- .../hadoop/hbase/client/ConnectionUtils.java | 35 ++++ .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 11 +- .../hadoop/hbase/client/ZKClusterRegistry.java | 82 ++++++++ .../hbase/zookeeper/RecoverableZooKeeper.java | 2 +- .../apache/hadoop/hbase/util/CollectionUtils.java | 24 +++ .../apache/hadoop/hbase/util/ReflectionUtils.java | 4 +- .../TestAsyncSingleActionRpcRetryCaller.java | 196 ++++++++++++++++++ .../apache/hadoop/hbase/client/TestAsyncTable.java | 132 ++++++++++++ pom.xml | 58 ++++-- 23 files changed, 1747 insertions(+), 63 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryCallerFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleActionRpcRetryCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleActionRpcRetryCaller.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index f3e27bc..f61c6d0 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -203,6 +203,18 @@ mockito-all test + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java new file mode 100644 index 0000000..bfef345 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of Connection. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncConnection extends Closeable { + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Retrieve a AsyncRegionLocator implementation to inspect region information on a table. The + * returned AsyncRegionLocator is not thread-safe, so a new instance should be created for each + * using thread. This is a lightweight operation. Pooling or caching of the returned + * AsyncRegionLocator is neither required nor desired. + * @param tableName Name of the table who's region is to be examined + * @return An AsyncRegionLocator instance + */ + AsyncTableRegionLocator getRegionLocator(TableName tableName); + + /** + * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread + * safe, a new instance should be created for each using thread. This is a lightweight operation, + * pooling or caching of the returned AsyncTable is neither required nor desired. + *

+ * This method no longer checks table existence. An exception will be thrown if the table does not + * exist only when the first operation is attempted. + * @param tableName the name of the table + * @return an AsyncTable to use for interactions with this table + */ + AsyncTable getTable(TableName tableName); + + /** + * {@inheritDoc} + */ + @Override + void close(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java new file mode 100644 index 0000000..deead0f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Timeout configs. + */ +@InterfaceAudience.Private +class AsyncConnectionConfiguration { + + final long metaOperationTimeoutNs; + + final long operationTimeoutNs; + + // timeout for each read rpc request + final long readRpcTimeoutNs; + + // timeout for each write rpc request + final long writeRpcTimeoutNs; + + AsyncConnectionConfiguration(Configuration conf) { + this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, + conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java new file mode 100644 index 0000000..bd591e6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Threads; + +/** + * The implementation of AsyncConnection. + */ +@InterfaceAudience.Private +class AsyncConnectionImpl implements AsyncConnection { + + private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); + + private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); + + private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; + + private final Configuration conf; + + final AsyncConnectionConfiguration connConf; + + private final User user; + + private final ClusterRegistry registry; + + private final String clusterId; + + private final int rpcTimeout; + + private final RpcClient rpcClient; + + final RpcControllerFactory rpcControllerFactory; + + private final boolean hostnameCanChange; + + private final AsyncRegionLocator locator; + + final AsyncRpcRetryCallerFactory callerFactory; + + public AsyncConnectionImpl(Configuration conf, User user) throws IOException { + this.conf = conf; + this.user = user; + + this.connConf = new AsyncConnectionConfiguration(conf); + + this.locator = new AsyncRegionLocator(conf); + + // action below will not throw exception so no need to catch and close. + this.registry = ClusterRegistryFactory.getRegistry(conf); + this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT); + } + return CLUSTER_ID_DEFAULT; + }); + this.rpcClient = RpcClientFactory.createClient(conf, clusterId); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); + this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); + this.callerFactory = new AsyncRpcRetryCallerFactory(this, RETRY_TIMER); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public void close() { + IOUtils.closeQuietly(locator); + IOUtils.closeQuietly(rpcClient); + IOUtils.closeQuietly(registry); + } + + @Override + public AsyncTableRegionLocator getRegionLocator(TableName tableName) { + return new AsyncTableRegionLocatorImpl(tableName, locator); + } + + // we will override this method for testing retry caller, so do not remove this method. + AsyncRegionLocator getLocator() { + return locator; + } + + private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); + + private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { + return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); + } + + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + return CollectionUtils.computeIfAbsentEx(rsStubs, + getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange), + () -> createRegionServerStub(serverName)); + } + + @Override + public AsyncTable getTable(TableName tableName) { + return new AsyncTableImpl(this, tableName); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java new file mode 100644 index 0000000..dc75ba6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * TODO: reimplement using aync connection when the scan logic is ready. The current implementation + * is based on the blocking client. + */ +@InterfaceAudience.Private +class AsyncRegionLocator implements Closeable { + + private final ConnectionImplementation conn; + + AsyncRegionLocator(Configuration conf) throws IOException { + conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf); + } + + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + boolean reload) { + CompletableFuture future = new CompletableFuture<>(); + try { + future.complete(conn.getRegionLocation(tableName, row, reload)); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, + ServerName source) { + conn.updateCachedLocations(tableName, regionName, row, exception, source); + } + + @Override + public void close() { + IOUtils.closeQuietly(conn); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryCallerFactory.java new file mode 100644 index 0000000..09b3dbc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryCallerFactory.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; +import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; + +import com.google.common.base.Preconditions; + +import io.netty.util.HashedWheelTimer; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Factory to create an AsyncRpcRetryCaller. + */ +@InterfaceAudience.Private +class AsyncRpcRetryCallerFactory { + + private final AsyncConnectionImpl conn; + + private final HashedWheelTimer retryTimer; + + private final long pauseNs; + + private final int maxRetries; + + /** How many retries are allowed before we start to log */ + private final int startLogErrorsCnt; + + public AsyncRpcRetryCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { + this.conn = conn; + Configuration conf = conn.getConfiguration(); + this.pauseNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.startLogErrorsCnt = + conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.retryTimer = retryTimer; + } + + public class SingleActionCallerBuilder { + + private TableName tableName; + + private byte[] row; + + private AsyncSingleActionRpcRetryCaller.Callable callable; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public SingleActionCallerBuilder table(TableName tableName) { + this.tableName = tableName; + return this; + } + + public SingleActionCallerBuilder row(byte[] row) { + this.row = row; + return this; + } + + public SingleActionCallerBuilder + action(AsyncSingleActionRpcRetryCaller.Callable callable) { + this.callable = callable; + return this; + } + + public SingleActionCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public SingleActionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncSingleActionRpcRetryCaller build() { + return new AsyncSingleActionRpcRetryCaller<>(retryTimer, conn, + Preconditions.checkNotNull(tableName, "tableName is null"), + Preconditions.checkNotNull(row, "row is null"), + Preconditions.checkNotNull(callable, "action is null"), pauseNs, maxRetries, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + } + + /** + * Shortcut for {@code build().call()} + */ + public CompletableFuture call() { + return build().call(); + } + } + + /** + * Create retry caller for single action, such as get, put, delete, etc. + */ + public SingleActionCallerBuilder single() { + return new SingleActionCallerBuilder<>(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleActionRpcRetryCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleActionRpcRetryCaller.java new file mode 100644 index 0000000..834fbe0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleActionRpcRetryCaller.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Retry caller for a single action, such as get, put, delete, etc. + */ +@InterfaceAudience.Private +class AsyncSingleActionRpcRetryCaller { + + private static final Log LOG = LogFactory.getLog(AsyncSingleActionRpcRetryCaller.class); + + @FunctionalInterface + public interface Callable { + CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, + ClientService.Interface stub); + } + + private final HashedWheelTimer retryTimer; + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final byte[] row; + + private final Callable callable; + + private final long pauseNs; + + private final int maxAttempts; + + private final long operationTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final CompletableFuture future; + + private final HBaseRpcController controller; + + private final List exceptions; + + private final long startNs; + + public AsyncSingleActionRpcRetryCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + TableName tableName, byte[] row, Callable callable, long pauseNs, int maxRetries, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.tableName = tableName; + this.row = row; + this.callable = callable; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + this.startNs = System.nanoTime(); + } + + private int tries = 1; + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + } + + private static Throwable translateException(Throwable t) { + if (t instanceof UndeclaredThrowableException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof ServiceException && t.getCause() != null) { + t = translateException(t.getCause()); + } + return t; + } + + private void completeExceptionally() { + future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + } + + private void onError(Throwable error, Supplier errMsg, + Consumer updateCachedLocation) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn(errMsg.get(), error); + } + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(error, + EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + completeExceptionally(); + return; + } + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs); + if (maxDelayNs <= 0) { + completeExceptionally(); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + updateCachedLocation.accept(error); + tries++; + retryTimer.newTimeout(new TimerTask() { + + @Override + public void run(Timeout timeout) throws Exception { + // always restart from beginning. + locate(); + } + }, delayNs, TimeUnit.NANOSECONDS); + } + + private void resetController() { + controller.reset(); + if (rpcTimeoutNs >= 0) { + controller.setCallTimeout( + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs))); + } + } + + private void call(HRegionLocation loc) { + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(loc.getServerName()); + } catch (IOException e) { + onError(e, + () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + elapsedMs() + " ms", + err -> { + }); + return; + } + resetController(); + callable.call(controller, loc, stub).whenComplete((result, error) -> { + if (error != null) { + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = " + + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + elapsedMs() + " ms", + err -> conn.getLocator().updateCachedLocations(tableName, + loc.getRegionInfo().getRegionName(), row, err, loc.getServerName())); + return; + } + future.complete(result); + }); + } + + private void locate() { + conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> { + if (error != null) { + onError(error, + () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " + + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + elapsedMs() + " ms", + err -> { + }); + return; + } + call(loc); + }); + } + + public CompletableFuture call() { + locate(); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java new file mode 100644 index 0000000..674e4d4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}. + *

+ * The implementation does NOT need to be thread safe. Do NOT access it from multiple threads + * concurrently. + *

+ * Usually the implementations will not throw any exception directly, you need to get the exception + * from the returned {@link CompletableFuture}. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncTable { + + /** + * Gets the fully qualified table name instance of this table. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Set timeout of each rpc read request in operations of this Table instance, will override the + * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too + * long, it will stop waiting and send a new request to retry until retries exhausted or operation + * timeout reached. + */ + void setReadRpcTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each rpc read request in this Table instance. + */ + long getReadRpcTimeout(TimeUnit unit); + + /** + * Set timeout of each rpc write request in operations of this Table instance, will override the + * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too + * long, it will stop waiting and send a new request to retry until retries exhausted or operation + * timeout reached. + */ + void setWriteRpcTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each rpc write request in this Table instance. + */ + long getWriteRpcTimeout(TimeUnit unit); + + /** + * Set timeout of each operation in this Table instance, will override the value of + * {@code hbase.client.operation.timeout} in configuration. + *

+ * Operation timeout is a top-level restriction that makes sure an operation will not be blocked + * more than this. In each operation, if rpc request fails because of timeout or other reason, it + * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed + * reach the operation timeout before retries exhausted, it will break early and throw + * SocketTimeoutException. + */ + void setOperationTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each operation in Table instance. + */ + long getOperationTimeout(TimeUnit unit); + + /** + * Test for the existence of columns in the table, as specified by the Get. + *

+ * This will return true if the Get matches one or more keys, false if not. + *

+ * This is a server-side call so it prevents any data from being transfered to the client. + */ + CompletableFuture exists(Get get); + + /** + * Extracts certain cells from a given row. + *

+ * Return the data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. + * @param get The object that specifies what data to fetch and from which row. + */ + CompletableFuture get(Get get); + + /** + * Puts some data to the table. + * @param put The data to put. + */ + CompletableFuture put(Put put); + + /** + * Deletes the specified cells/row. + * @param delete The object that specifies what to delete. + */ + CompletableFuture delete(Delete delete); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java new file mode 100644 index 0000000..621f1a5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncRpcRetryCallerFactory.SingleActionCallerBuilder; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * The implementation of AsyncTable. + */ +@InterfaceAudience.Private +class AsyncTableImpl implements AsyncTable { + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private long readRpcTimeoutNs; + + private long writeRpcTimeoutNs; + + private long operationTimeoutNs; + + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { + this.conn = conn; + this.tableName = tableName; + this.readRpcTimeoutNs = conn.connConf.readRpcTimeoutNs; + this.writeRpcTimeoutNs = conn.connConf.writeRpcTimeoutNs; + this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.metaOperationTimeoutNs + : conn.connConf.operationTimeoutNs; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return conn.getConfiguration(); + } + + @FunctionalInterface + private interface Converter { + D convert(I info, S src) throws IOException; + } + + @FunctionalInterface + private interface RpcCall { + void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback done); + } + + private static CompletableFuture call( + HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, RpcCall rpcCall, + Converter respConverter) { + CompletableFuture future = new CompletableFuture<>(); + try { + rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), + new RpcCallback() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(respConverter.convert(controller, resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + private SingleActionCallerBuilder newCaller(Row row, long rpcTimeoutNs) { + return conn.callerFactory. single().table(tableName).row(row.getRow()) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public CompletableFuture exists(Get get) { + if (!get.isCheckExistenceOnly()) { + get = ReflectionUtils.newInstance(Get.class, get); + get.setCheckExistenceOnly(true); + } + return get(get).thenApply(r -> r.getExists()); + } + + @Override + public CompletableFuture get(Get get) { + return this. newCaller(get, readRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl + . call(controller, loc, stub, get, + RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), + (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) + .call(); + } + + @Override + public CompletableFuture put(Put put) { + return this. newCaller(put, writeRpcTimeoutNs) + .action( + (controller, loc, stub) -> AsyncTableImpl. call( + controller, loc, stub, put, RequestConverter::buildMutateRequest, + (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { + return null; + })) + .call(); + } + + @Override + public CompletableFuture delete(Delete delete) { + return this. newCaller(delete, writeRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl + . call(controller, loc, stub, delete, + RequestConverter::buildMutateRequest, (s, c, req, done) -> s.mutate(c, req, done), + (c, resp) -> { + return null; + })) + .call(); + } + + @Override + public void setReadRpcTimeout(long timeout, TimeUnit unit) { + this.readRpcTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getReadRpcTimeout(TimeUnit unit) { + return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setWriteRpcTimeout(long timeout, TimeUnit unit) { + this.writeRpcTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getWriteRpcTimeout(TimeUnit unit) { + return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setOperationTimeout(long timeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getOperationTimeout(TimeUnit unit) { + return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java new file mode 100644 index 0000000..989e8d9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of RegionLocator. + *

+ * Usually the implementations will not throw any exception directly, you need to get the exception + * from the returned {@link CompletableFuture}. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncTableRegionLocator { + + /** + * Gets the fully qualified table name instance of the table whose region we want to locate. + */ + TableName getName(); + + /** + * Finds the region on which the given row is being served. Does not reload the cache. + *

+ * Returns the location of the region to which the row belongs. + * @param row Row to find. + */ + default CompletableFuture getRegionLocation(byte[] row) { + return getRegionLocation(row, false); + } + + /** + * Finds the region on which the given row is being served. + *

+ * Returns the location of the region to which the row belongs. + * @param row Row to find. + * @param reload true to reload information or false to use cached information + */ + CompletableFuture getRegionLocation(byte[] row, boolean reload); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java new file mode 100644 index 0000000..d715e24 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The implementation of AsyncRegionLocator. + */ +@InterfaceAudience.Private +class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { + + private final TableName tableName; + + private final AsyncRegionLocator locator; + + public AsyncTableRegionLocatorImpl(TableName tableName, AsyncRegionLocator locator) { + this.tableName = tableName; + this.locator = locator; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public CompletableFuture getRegionLocation(byte[] row, boolean reload) { + return locator.getRegionLocation(tableName, row, reload); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java new file mode 100644 index 0000000..c1918a7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Implementations hold cluster information such as this cluster's id. + *

+ * Internal use only. + */ +@InterfaceAudience.Private +interface ClusterRegistry extends Closeable { + + /** + * Should only be called once. + *

+ * The upper layer should store this value somewhere as it will not be change any more. + */ + String getClusterId(); + + @Override + void close(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java new file mode 100644 index 0000000..a6b3e39 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Get instance of configured Registry. + */ +@InterfaceAudience.Private +final class ClusterRegistryFactory { + + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + + private ClusterRegistryFactory() { + } + + /** + * @return The cluster registry implementation to use. + */ + static ClusterRegistry getRegistry(Configuration conf) { + Class clazz = + conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, ClusterRegistry.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8db9dbf..96c6af1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -18,14 +18,16 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; +import com.google.common.annotations.VisibleForTesting; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -38,8 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import edu.umd.cs.findbugs.annotations.Nullable; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,6 +63,11 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; @@ -79,8 +84,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -92,11 +95,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import edu.umd.cs.findbugs.annotations.Nullable; /** * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. @@ -196,7 +195,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); // how many times to try, one more than max *retry* time - this.numTries = connectionConfig.getRetriesNumber() + 1; + this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @@ -1094,8 +1093,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key - String key = getStubKey(getServiceName(), - sn.getHostname(), sn.getPort(), hostnamesCanChange); + String key = getStubKey(getServiceName(), sn, hostnamesCanChange); connectionLock.putIfAbsent(key, key); Object stub = null; synchronized (connectionLock.get(key)) { @@ -1176,8 +1174,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } - String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), - serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange); + String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, + this.hostnamesCanChange); this.connectionLock.putIfAbsent(key, key); AdminProtos.AdminService.BlockingInterface stub; synchronized (this.connectionLock.get(key)) { @@ -1198,8 +1196,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (isDeadServer(sn)) { throw new RegionServerStoppedException(sn + " is dead."); } - String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn.getHostname(), - sn.getPort(), this.hostnamesCanChange); + String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn, + this.hostnamesCanChange); this.connectionLock.putIfAbsent(key, key); ClientProtos.ClientService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { @@ -1215,25 +1213,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return stub; } - static String getStubKey(final String serviceName, - final String rsHostname, - int port, - boolean resolveHostnames) { - // Sometimes, servers go down and they come back up with the same hostname but a different - // IP address. Force a resolution of the rsHostname by trying to instantiate an - // InetSocketAddress, and this way we will rightfully get a new stubKey. - // Also, include the hostname in the key so as to take care of those cases where the - // DNS name is different but IP address remains the same. - String address = rsHostname; - if (resolveHostnames) { - InetAddress i = new InetSocketAddress(rsHostname, port).getAddress(); - if (i != null) { - address = i.getHostAddress() + "-" + rsHostname; - } - } - return serviceName + "@" + address + ":" + port; - } - private ZooKeeperKeepAliveConnection keepAliveZookeeper; private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 83655f0..2f5d2b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -40,6 +43,8 @@ import org.apache.hadoop.hbase.security.UserProvider; @InterfaceAudience.Private public final class ConnectionUtils { + private static final Log LOG = LogFactory.getLog(ConnectionUtils.class); + private ConnectionUtils() {} /** @@ -167,4 +172,34 @@ public final class ConnectionUtils { return false; } } + + /** + * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. + */ + static int retries2Attempts(int retries) { + return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); + } + + /** + * Get a unique key for the rpc stub to the given server. + */ + static String getStubKey(String serviceName, ServerName serverName, + boolean hostnameCanChange) { + // Sometimes, servers go down and they come back up with the same hostname but a different + // IP address. Force a resolution of the rsHostname by trying to instantiate an + // InetSocketAddress, and this way we will rightfully get a new stubKey. + // Also, include the hostname in the key so as to take care of those cases where the + // DNS name is different but IP address remains the same. + String hostname = serverName.getHostname(); + int port = serverName.getPort(); + if (hostnameCanChange) { + try { + InetAddress ip = InetAddress.getByName(hostname); + return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port; + } catch (UnknownHostException e) { + LOG.warn("Can not resolve " + hostname + ", please check your network", e); + } + } + return serviceName + "@" + hostname + ":" + port; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index e940143..91a20ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; @@ -32,12 +34,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; - /** * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client * threadlocal outstanding timeouts as so we don't persist too much. @@ -66,18 +67,18 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); } - + public RpcRetryingCallerImpl(long pause, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; - this.maxAttempts = retries + 1; + this.maxAttempts = retries2Attempts(retries); this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; this.tracker = new RetryingTimeTracker(); this.rpcTimeout = rpcTimeout; } - + @Override public void cancel(){ cancelled.set(true); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java new file mode 100644 index 0000000..5c205aa --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.removeMetaData; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; + +/** + * Cache the cluster registry data in memory and use zk watcher to update. The only exception is + * {@link #getClusterId()}, it will fetch the data from zk directly. + */ +@InterfaceAudience.Private +class ZKClusterRegistry implements ClusterRegistry { + + private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class); + + private final CuratorFramework zk; + + private final ZNodePaths znodePaths; + + ZKClusterRegistry(Configuration conf) { + this.znodePaths = new ZNodePaths(conf); + int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); + int zkRetry = conf.getInt("zookeeper.recovery.retry", 3); + int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); + this.zk = CuratorFrameworkFactory.builder() + .connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs)) + .threadFactory( + Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode()))) + .build(); + this.zk.start(); + } + + @Override + public String getClusterId() { + try { + byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode); + if (data == null || data.length == 0) { + return null; + } + return ClusterId.parseFrom(removeMetaData(data)).toString(); + } catch (Exception e) { + LOG.warn("failed to get cluster id", e); + return null; + } + } + + @Override + public void close() { + IOUtils.closeQuietly(zk); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 371279e..11d2e75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -707,7 +707,7 @@ public class RecoverableZooKeeper { return null; } - public byte[] removeMetaData(byte[] data) { + public static byte[] removeMetaData(byte[] data) { if(data == null || data.length == 0) { return data; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index b7b9beb..775f8bd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hbase.util; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -104,4 +106,26 @@ public class CollectionUtils { } return list.get(list.size() - 1); } + + /** + * A supplier that throws IOException when get. + */ + @FunctionalInterface + public interface IOExceptionSupplier { + V get() throws IOException; + } + + /** + * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the + * value already exists. So here we copy the implementation of + * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and + * putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee + * that the supplier will only be executed once. + */ + public static V computeIfAbsentEx(ConcurrentMap map, K key, + IOExceptionSupplier supplier) throws IOException { + V v, newValue; + return ((v = map.get(key)) == null && (newValue = supplier.get()) != null + && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index 15b3930..740f9ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -52,6 +52,7 @@ public class ReflectionUtils { private static T instantiate(final String className, Constructor ctor, Object[] ctorArgs) { try { + ctor.setAccessible(true); return ctor.newInstance(ctorArgs); } catch (IllegalAccessException e) { throw new UnsupportedOperationException( @@ -65,14 +66,13 @@ public class ReflectionUtils { } } - @SuppressWarnings("unchecked") public static T newInstance(Class type, Object... params) { return instantiate(type.getName(), findConstructor(type, params), params); } @SuppressWarnings("unchecked") public static Constructor findConstructor(Class type, Object... paramTypes) { - Constructor[] constructors = (Constructor[])type.getConstructors(); + Constructor[] constructors = (Constructor[]) type.getDeclaredConstructors(); for (Constructor ctor : constructors) { Class[] ctorParamTypes = ctor.getParameterTypes(); if (ctorParamTypes.length != paramTypes.length) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleActionRpcRetryCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleActionRpcRetryCaller.java new file mode 100644 index 0000000..7bb3449 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleActionRpcRetryCaller.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncSingleActionRpcRetryCaller { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private AsyncConnectionImpl asyncConn; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() { + if (asyncConn != null) { + asyncConn.close(); + asyncConn = null; + } + } + + private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt); + conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires); + asyncConn = new AsyncConnectionImpl(conf, User.getCurrent()); + } + + @Test + public void testRegionMove() throws InterruptedException, ExecutionException, IOException { + initConn(0, 100, 30); + // This will leave a cached entry in location cache + HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); + int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName()); + TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes( + TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); + AsyncTable table = asyncConn.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + + // move back + TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(loc.getServerName().getServerName())); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + } + + private CompletableFuture failedFuture() { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Inject error!")); + return future; + } + + @Test + public void testMaxRetries() throws IOException, InterruptedException { + initConn(0, 10, 2); + try { + asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) + .action((controller, loc, stub) -> failedFuture()).call().get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); + } + } + + @Test + public void testOperationTimeout() throws IOException, InterruptedException { + initConn(0, 100, Integer.MAX_VALUE); + long startNs = System.nanoTime(); + try { + asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW) + .operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture()) + .call().get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); + } + long costNs = System.nanoTime() - startNs; + assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1)); + assertTrue(costNs < TimeUnit.SECONDS.toNanos(2)); + } + + @Test + public void testLocateError() throws IOException, InterruptedException, ExecutionException { + initConn(0, 100, 5); + AtomicBoolean errorTriggered = new AtomicBoolean(false); + AtomicInteger count = new AtomicInteger(0); + HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); + + try (AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn.getConfiguration()) { + @Override + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + boolean reload) { + if (tableName.equals(TABLE_NAME)) { + CompletableFuture future = new CompletableFuture<>(); + if (count.getAndIncrement() == 0) { + errorTriggered.set(true); + future.completeExceptionally(new RuntimeException("Inject error!")); + } else { + future.complete(loc); + } + return future; + } else { + return super.getRegionLocation(tableName, row, reload); + } + } + + @Override + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, + Object exception, ServerName source) { + } + }; + AsyncConnectionImpl mockedConn = + new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) { + + @Override + AsyncRegionLocator getLocator() { + return mockedLocator; + } + }) { + AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + assertTrue(errorTriggered.get()); + errorTriggered.set(false); + count.set(0); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + assertTrue(errorTriggered.get()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java new file mode 100644 index 0000000..0667de3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTable { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + table.delete(new Delete(ROW)).get(); + result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertTrue(result.isEmpty()); + assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); + } + + private byte[] concat(byte[] base, int index) { + return Bytes.toBytes(Bytes.toString(base) + "-" + index); + } + + @Test + public void testMultiple() throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 100; + CountDownLatch putLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach( + i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) + .thenAccept(x -> putLatch.countDown())); + putLatch.await(); + BlockingQueue existsResp = new ArrayBlockingQueue<>(count); + IntStream.range(0, count) + .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> existsResp.add(x))); + for (int i = 0; i < count; i++) { + assertTrue(existsResp.take()); + } + BlockingQueue> getResp = new ArrayBlockingQueue<>(count); + IntStream.range(0, count) + .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); + for (int i = 0; i < count; i++) { + Pair pair = getResp.take(); + assertArrayEquals(concat(VALUE, pair.getFirst()), + pair.getSecond().getValue(FAMILY, QUALIFIER)); + } + CountDownLatch deleteLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach( + i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> deleteLatch.countDown())); + deleteLatch.await(); + IntStream.range(0, count) + .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> existsResp.add(x))); + for (int i = 0; i < count; i++) { + assertFalse(existsResp.take()); + } + IntStream.range(0, count) + .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); + for (int i = 0; i < count; i++) { + Pair pair = getResp.take(); + assertTrue(pair.getSecond().isEmpty()); + } + } +} diff --git a/pom.xml b/pom.xml index e70fa17..cfa2f64 100644 --- a/pom.xml +++ b/pom.xml @@ -1228,6 +1228,7 @@ 2.11.6 1.46 1.0.0-RC2 + 2.11.0 2.4 1.8 @@ -1784,28 +1785,49 @@ disruptor ${disruptor.version} - + net.spy spymemcached ${spy.version} true - - - org.bouncycastle - bcprov-jdk16 - ${bouncycastle.version} - test - - - org.apache.kerby - kerb-client - ${kerby.version} - - - org.apache.kerby - kerb-simplekdc - ${kerby.version} - + + + org.bouncycastle + bcprov-jdk16 + ${bouncycastle.version} + test + + + org.apache.kerby + kerb-client + ${kerby.version} + + + org.apache.kerby + kerb-simplekdc + ${kerby.version} + + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.curator + curator-framework + ${curator.version} + + + org.apache.curator + curator-client + ${curator.version} + + + com.google.guava + guava + + + -- 1.9.1