From 6598e38151d0a8e5b609b30b97c5077638613d42 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 10 Oct 2016 11:43:08 +0800 Subject: [PATCH] HBASE-15921 Add first AsyncTable impl and create TableImpl based on it --- hbase-client/pom.xml | 12 + .../apache/hadoop/hbase/client/AsyncProcess.java | 17 +- .../hbase/client/ConnectionImplementation.java | 22 +- .../hadoop/hbase/client/ConnectionUtils.java | 7 + .../hbase/client/RpcRetryingCallerFactory.java | 14 +- .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 7 +- .../hadoop/hbase/client/async/AsyncConnection.java | 68 ++++++ .../client/async/AsyncConnectionConfiguration.java | 59 +++++ .../hbase/client/async/AsyncConnectionImpl.java | 171 +++++++++++++ .../hbase/client/async/AsyncRegionLocator.java | 60 +++++ .../hbase/client/async/AsyncRegionLocatorImpl.java | 50 ++++ .../client/async/AsyncRpcRetryCallerFactory.java | 119 +++++++++ .../client/async/AsyncSingleActionRetryCaller.java | 230 ++++++++++++++++++ .../hadoop/hbase/client/async/AsyncTable.java | 127 ++++++++++ .../hadoop/hbase/client/async/AsyncTableImpl.java | 196 +++++++++++++++ .../hadoop/hbase/client/async/ClusterRegistry.java | 50 ++++ .../hbase/client/async/ClusterRegistryFactory.java | 43 ++++ .../hbase/client/async/RegionLocatorImpl.java | 66 +++++ .../hbase/client/async/ZKClusterRegistry.java | 268 +++++++++++++++++++++ .../hbase/zookeeper/RecoverableZooKeeper.java | 2 +- .../apache/hadoop/hbase/util/CollectionUtils.java | 15 ++ .../apache/hadoop/hbase/util/ReflectionUtils.java | 4 +- .../async/TestAsyncSingleActionRetryCaller.java | 201 ++++++++++++++++ .../hadoop/hbase/client/async/TestAsyncTable.java | 136 +++++++++++ pom.xml | 58 +++-- 25 files changed, 1953 insertions(+), 49 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionConfiguration.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncSingleActionRetryCaller.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index f3e27bc..f61c6d0 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -203,6 +203,18 @@ mockito-all test + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f2d9546..57e9ec4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.START_LOG_ERRORS_AFTER_COUNT_KEY; + +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -55,8 +60,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.annotations.VisibleForTesting; - /** * This class allows a continuous flow of requests. It's written to be compatible with a * synchronous caller such as HTable. @@ -100,16 +103,6 @@ class AsyncProcess { public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; /** - * Configure the number of failures after which the client will start logging. A few failures - * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable - * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at - * this stage. - */ - public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = - "hbase.client.start.log.errors.counter"; - public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; - - /** * Configuration to decide whether to log details for batch error */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8db9dbf..751adcc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -18,8 +18,11 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; +import com.google.common.annotations.VisibleForTesting; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -38,8 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import edu.umd.cs.findbugs.annotations.Nullable; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,6 +64,11 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; @@ -79,8 +85,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -92,11 +96,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import edu.umd.cs.findbugs.annotations.Nullable; /** * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. @@ -106,7 +106,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", justification="Access to the conncurrent hash map is under a lock so should be fine.") @InterfaceAudience.Private -class ConnectionImplementation implements ClusterConnection, Closeable { +public class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Log LOG = LogFactory.getLog(ConnectionImplementation.class); private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; @@ -196,7 +196,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); // how many times to try, one more than max *retry* time - this.numTries = connectionConfig.getRetriesNumber() + 1; + this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 83655f0..42f0d79 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -167,4 +167,11 @@ public final class ConnectionUtils { return false; } } + + /** + * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. + */ + public static int retries2Attempts(int retries) { + return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index f92aeae..5c1d488 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -30,6 +30,16 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; + + /** + * Configure the number of failures after which the client will start logging. A few failures is + * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable + * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at this + * stage. + */ + public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = "hbase.client.start.log.errors.counter"; + public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; + protected final Configuration conf; private final long pause; private final int retries; @@ -50,8 +60,8 @@ public class RpcRetryingCallerFactory { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, - AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, + DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index e940143..6bc2045 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; @@ -32,12 +34,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; - /** * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client * threadlocal outstanding timeouts as so we don't persist too much. @@ -70,7 +71,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { public RpcRetryingCallerImpl(long pause, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; - this.maxAttempts = retries + 1; + this.maxAttempts = retries2Attempts(retries); this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java new file mode 100644 index 0000000..7e46a3e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.Closeable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of Connection. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncConnection extends Closeable { + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Retrieve a AsyncRegionLocator implementation to inspect region information on a table. The + * returned AsyncRegionLocator is not thread-safe, so a new instance should be created for each + * using thread. This is a lightweight operation. Pooling or caching of the returned + * AsyncRegionLocator is neither required nor desired. + * @param tableName Name of the table who's region is to be examined + * @return An AsyncRegionLocator instance + */ + AsyncRegionLocator getRegionLocator(TableName tableName); + + /** + * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread + * safe, a new instance should be created for each using thread. This is a lightweight operation, + * pooling or caching of the returned AsyncTable is neither required nor desired. + *

+ * This method no longer checks table existence. An exception will be thrown if the table does not + * exist only when the first operation is attempted. + * @param tableName the name of the table + * @return an AsyncTable to use for interactions with this table + */ + AsyncTable getTable(TableName tableName); + + /** + * {@inheritDoc} + */ + @Override + void close(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionConfiguration.java new file mode 100644 index 0000000..3b06070 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionConfiguration.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Timeout configs. + */ +@InterfaceAudience.Private +class AsyncConnectionConfiguration { + + final long metaOperationTimeoutNs; + + final long operationTimeoutNs; + + // timeout for each read rpc request + final long readRpcTimeoutNs; + + // timeout for each write rpc request + final long writeRpcTimeoutNs; + + AsyncConnectionConfiguration(Configuration conf) { + this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, + conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java new file mode 100644 index 0000000..7a5c870 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Threads; + +/** + * The implementation of AsyncConnection. + */ +@InterfaceAudience.Private +class AsyncConnectionImpl implements AsyncConnection { + + private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); + + private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); + + private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; + + private final Configuration conf; + + final AsyncConnectionConfiguration connConf; + + private final User user; + + private final ClusterRegistry registry; + + private final String clusterId; + + private final int rpcTimeout; + + private final RpcClient rpcClient; + + final RpcControllerFactory rpcControllerFactory; + + private final boolean hostnameCanChange; + + private final RegionLocatorImpl locator; + + final AsyncRpcRetryCallerFactory callerFactory; + + public AsyncConnectionImpl(Configuration conf, User user) throws IOException { + this.conf = conf; + this.user = user; + + this.connConf = new AsyncConnectionConfiguration(conf); + + this.locator = new RegionLocatorImpl(conf); + + // action below will not throw exception so no need to catch and close. + this.registry = ClusterRegistryFactory.getRegistry(conf); + this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug("cluster id came back null, using default " + HConstants.CLUSTER_ID_DEFAULT); + } + return HConstants.CLUSTER_ID_DEFAULT; + }); + this.rpcClient = RpcClientFactory.createClient(conf, clusterId); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); + this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.callerFactory = new AsyncRpcRetryCallerFactory(this, RETRY_TIMER); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public void close() { + IOUtils.closeQuietly(locator); + IOUtils.closeQuietly(rpcClient); + IOUtils.closeQuietly(registry); + } + + @Override + public AsyncRegionLocator getRegionLocator(TableName tableName) { + return new AsyncRegionLocatorImpl(tableName, locator); + } + + // we will override this method for testing retry caller, so do not remove this method. + RegionLocatorImpl getLocator() { + return locator; + } + + private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); + + private String getStubKey(String serviceName, ServerName serverName) { + // Sometimes, servers go down and they come back up with the same hostname but a different + // IP address. Force a resolution of the rsHostname by trying to instantiate an + // InetSocketAddress, and this way we will rightfully get a new stubKey. + // Also, include the hostname in the key so as to take care of those cases where the + // DNS name is different but IP address remains the same. + String hostname = serverName.getHostname(); + int port = serverName.getPort(); + if (hostnameCanChange) { + try { + InetAddress ip = InetAddress.getByName(hostname); + return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port; + } catch (UnknownHostException e) { + LOG.warn("Can not resolve " + hostname + ", please check your network", e); + } + } + return serviceName + "@" + hostname + ":" + port; + } + + private ClientService.Interface createRegionServerStub(ServerName serverName) { + try { + return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + try { + return CollectionUtils.computeIfAbsent(rsStubs, + getStubKey(ClientService.Interface.class.getSimpleName(), serverName), + () -> createRegionServerStub(serverName)); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + + @Override + public AsyncTable getTable(TableName tableName) { + return new AsyncTableImpl(this, tableName); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java new file mode 100644 index 0000000..374e646 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of RegionLocator. + *

+ * Usually the implementations will not throw any exception directly, you need to get the exception + * from the returned {@link CompletableFuture}. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncRegionLocator { + + /** + * Gets the fully qualified table name instance of the table whose region we want to locate. + */ + TableName getName(); + + /** + * Finds the region on which the given row is being served. Does not reload the cache. + *

+ * Returns the location of the region to which the row belongs. + * @param row Row to find. + */ + default CompletableFuture getRegionLocation(byte[] row) { + return getRegionLocation(row, false); + } + + /** + * Finds the region on which the given row is being served. + *

+ * Returns the location of the region to which the row belongs. + * @param row Row to find. + * @param reload true to reload information or false to use cached information + */ + CompletableFuture getRegionLocation(byte[] row, boolean reload); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java new file mode 100644 index 0000000..2215910 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The implementation of AsyncRegionLocator. + */ +@InterfaceAudience.Private +class AsyncRegionLocatorImpl implements AsyncRegionLocator { + + private final TableName tableName; + + private final RegionLocatorImpl locator; + + public AsyncRegionLocatorImpl(TableName tableName, RegionLocatorImpl locator) { + this.tableName = tableName; + this.locator = locator; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public CompletableFuture getRegionLocation(byte[] row, boolean reload) { + return locator.getRegionLocation(tableName, row, reload); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java new file mode 100644 index 0000000..e9432c8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.START_LOG_ERRORS_AFTER_COUNT_KEY; + +import com.google.common.base.Preconditions; + +import io.netty.util.HashedWheelTimer; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Factory to create an AsyncRpcRetryCaller. + */ +@InterfaceAudience.Private +public class AsyncRpcRetryCallerFactory { + + private final AsyncConnectionImpl conn; + + private final HashedWheelTimer retryTimer; + + private final long pauseNs; + + private final int maxRetries; + + /** How many retries are allowed before we start to log */ + private final int startLogErrorsCnt; + + public AsyncRpcRetryCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { + this.conn = conn; + Configuration conf = conn.getConfiguration(); + this.pauseNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, + DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.retryTimer = retryTimer; + } + + public class SingleActionCallerBuilder { + + private TableName tableName; + + private byte[] row; + + private AsyncSingleActionRetryCaller.Callable callable; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public SingleActionCallerBuilder table(TableName tableName) { + this.tableName = tableName; + return this; + } + + public SingleActionCallerBuilder row(byte[] row) { + this.row = row; + return this; + } + + public SingleActionCallerBuilder action(AsyncSingleActionRetryCaller.Callable callable) { + this.callable = callable; + return this; + } + + public SingleActionCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public SingleActionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncSingleActionRetryCaller build() { + return new AsyncSingleActionRetryCaller<>(retryTimer, conn, + Preconditions.checkNotNull(tableName, "tableName is null"), + Preconditions.checkNotNull(row, "row is null"), + Preconditions.checkNotNull(callable, "action is null"), pauseNs, maxRetries, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + } + + public CompletableFuture call() { + return build().call(); + } + } + + public SingleActionCallerBuilder single() { + return new SingleActionCallerBuilder<>(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java new file mode 100644 index 0000000..62a72e9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Retry caller for a single action, such as get, put, delete, etc. + */ +@InterfaceAudience.Private +public class AsyncSingleActionRetryCaller { + + private static final Log LOG = LogFactory.getLog(AsyncSingleActionRetryCaller.class); + + @FunctionalInterface + public interface Callable { + CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, + ClientService.Interface stub); + } + + private final HashedWheelTimer retryTimer; + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final byte[] row; + + private final Callable callable; + + private final long pauseNs; + + private final int maxAttempts; + + private final long operationTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final CompletableFuture future; + + private final HBaseRpcController controller; + + private final List exceptions; + + private final long startNs; + + public AsyncSingleActionRetryCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + TableName tableName, byte[] row, Callable callable, long pauseNs, int maxRetries, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.tableName = tableName; + this.row = row; + this.callable = callable; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + this.startNs = System.nanoTime(); + } + + private int tries = 1; + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + } + + private static Throwable translateException(Throwable t) { + if (t instanceof UndeclaredThrowableException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof ServiceException && t.getCause() != null) { + t = translateException(t.getCause()); + } + return t; + } + + private void completeExceptionally() { + future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + } + + private void onError(Throwable error, Supplier errMsg, + Consumer updateCachedLocation) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn(errMsg.get(), error); + } + RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext( + error, EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + completeExceptionally(); + return; + } + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs); + if (maxDelayNs <= 0) { + completeExceptionally(); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + updateCachedLocation.accept(error); + tries++; + retryTimer.newTimeout(new TimerTask() { + + @Override + public void run(Timeout timeout) throws Exception { + // always restart from beginning. + locate(); + } + }, delayNs, TimeUnit.NANOSECONDS); + } + + private void resetController() { + controller.reset(); + if (rpcTimeoutNs >= 0) { + controller.setCallTimeout( + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs))); + } + } + + private void call(HRegionLocation loc) { + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(loc.getServerName()); + } catch (IOException e) { + onError(e, + () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + elapsedMs() + " ms", + err -> { + }); + return; + } + resetController(); + callable.call(controller, loc, stub).whenComplete((result, error) -> { + if (error != null) { + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = " + + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + elapsedMs() + " ms", + err -> conn.getLocator().updateCachedLocations(tableName, + loc.getRegionInfo().getRegionName(), row, err, loc.getServerName())); + return; + } + future.complete(result); + }); + } + + private void locate() { + conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> { + if (error != null) { + onError(error, + () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " + + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + elapsedMs() + " ms", + err -> { + }); + return; + } + call(loc); + }); + } + + public CompletableFuture call() { + locate(); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java new file mode 100644 index 0000000..40e7b94 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; + +/** + * The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}. + *

+ * Usually the implementations will not throw any exception directly, you need to get the exception + * from the returned {@link CompletableFuture}. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncTable { + + /** + * Gets the fully qualified table name instance of this table. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Set timeout of each rpc read request in operations of this Table instance, will override the + * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too + * long, it will stop waiting and send a new request to retry until retries exhausted or operation + * timeout reached. + */ + void setReadRpcTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each rpc read request in this Table instance. + */ + long getReadRpcTimeout(TimeUnit unit); + + /** + * Set timeout of each rpc write request in operations of this Table instance, will override the + * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too + * long, it will stop waiting and send a new request to retry until retries exhausted or operation + * timeout reached. + */ + void setWriteRpcTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each rpc write request in this Table instance. + */ + long getWriteRpcTimeout(TimeUnit unit); + + /** + * Set timeout of each operation in this Table instance, will override the value of + * {@code hbase.client.operation.timeout} in configuration. + *

+ * Operation timeout is a top-level restriction that makes sure an operation will not be blocked + * more than this. In each operation, if rpc request fails because of timeout or other reason, it + * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed + * reach the operation timeout before retries exhausted, it will break early and throw + * SocketTimeoutException. + */ + void setOperationTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each operation in Table instance. + */ + long getOperationTimeout(TimeUnit unit); + + /** + * Test for the existence of columns in the table, as specified by the Get. + *

+ * This will return true if the Get matches one or more keys, false if not. + *

+ * This is a server-side call so it prevents any data from being transfered to the client. + */ + CompletableFuture exists(Get get); + + /** + * Extracts certain cells from a given row. + *

+ * Return the data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. + * @param get The object that specifies what data to fetch and from which row. + */ + CompletableFuture get(Get get); + + /** + * Puts some data to the table. + * @param put The data to put. + */ + CompletableFuture put(Put put); + + /** + * Deletes the specified cells/row. + * @param delete The object that specifies what to delete. + */ + CompletableFuture delete(Delete delete); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java new file mode 100644 index 0000000..1f47a70 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.async.AsyncRpcRetryCallerFactory.SingleActionCallerBuilder; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * The implementation of AsyncTable. + */ +@InterfaceAudience.Private +public class AsyncTableImpl implements AsyncTable { + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private long readRpcTimeoutNs; + + private long writeRpcTimeoutNs; + + private long operationTimeoutNs; + + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { + this.conn = conn; + this.tableName = tableName; + this.readRpcTimeoutNs = conn.connConf.readRpcTimeoutNs; + this.writeRpcTimeoutNs = conn.connConf.writeRpcTimeoutNs; + this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.metaOperationTimeoutNs + : conn.connConf.operationTimeoutNs; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return conn.getConfiguration(); + } + + @FunctionalInterface + private interface Converter { + D convert(I info, S src) throws IOException; + } + + @FunctionalInterface + private interface RpcCall { + void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback done); + } + + private CompletableFuture call(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, RpcCall rpcCall, + Converter respConverter) { + CompletableFuture future = new CompletableFuture<>(); + try { + rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), + new RpcCallback() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(respConverter.convert(controller, resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + private SingleActionCallerBuilder newCaller(Row row, long rpcTimeoutNs) { + return conn.callerFactory. single().table(tableName).row(row.getRow()) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public CompletableFuture exists(Get get) { + if (!get.isCheckExistenceOnly()) { + get = ReflectionUtils.newInstance(Get.class, get); + get.setCheckExistenceOnly(true); + } + return get(get).thenApply(r -> r.getExists()); + } + + @Override + public CompletableFuture get(Get get) { + return this. newCaller(get, readRpcTimeoutNs) + .action((controller, loc, stub) -> this. call( + controller, loc, stub, get, RequestConverter::buildGetRequest, + (s, c, req, done) -> s.get(c, req, done), + (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) + .call(); + } + + @Override + public CompletableFuture put(Put put) { + return this. newCaller(put, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. call( + controller, loc, stub, put, RequestConverter::buildMutateRequest, + (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { + return null; + })) + .call(); + } + + @Override + public CompletableFuture delete(Delete delete) { + return this. newCaller(delete, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. call( + controller, loc, stub, delete, RequestConverter::buildMutateRequest, + (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { + return null; + })) + .call(); + } + + @Override + public void setReadRpcTimeout(long timeout, TimeUnit unit) { + this.readRpcTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getReadRpcTimeout(TimeUnit unit) { + return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setWriteRpcTimeout(long timeout, TimeUnit unit) { + this.writeRpcTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getWriteRpcTimeout(TimeUnit unit) { + return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setOperationTimeout(long timeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getOperationTimeout(TimeUnit unit) { + return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); + } + +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java new file mode 100644 index 0000000..996ddd8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.Closeable; + +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc. + * Internal use only. + */ +@InterfaceAudience.Private +interface ClusterRegistry extends Closeable { + + RegionLocations getMetaRegionLocation(); + + /** + * Should only be called once. + *

+ * The upper layer should store this value somewhere as it will not be change any more. + */ + String getClusterId(); + + int getCurrentNrHRS(); + + ServerName getMasterAddress(); + + int getMasterInfoPort(); + + @Override + void close(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java new file mode 100644 index 0000000..fa3be24 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Get instance of configured Registry. + */ +@InterfaceAudience.Private +final class ClusterRegistryFactory { + + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + + private ClusterRegistryFactory() { + } + + /** + * @return The cluster registry implementation to use. + */ + static ClusterRegistry getRegistry(Configuration conf) { + Class clazz = + conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, ClusterRegistry.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java new file mode 100644 index 0000000..4911b4b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionImplementation; + +/** + * TODO: reimplement using aync connection when the scan logic is ready. The current implementation + * is based on the blocking client. + */ +@InterfaceAudience.Private +class RegionLocatorImpl implements Closeable { + + private final ConnectionImplementation conn; + + RegionLocatorImpl(Configuration conf) throws IOException { + conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf); + } + + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + boolean reload) { + CompletableFuture future = new CompletableFuture<>(); + try { + future.complete(conn.getRegionLocation(tableName, row, reload)); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, + ServerName source) { + conn.updateCachedLocations(tableName, regionName, row, exception, source); + } + + @Override + public void close() { + IOUtils.closeQuietly(conn); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java new file mode 100644 index 0000000..1276500 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.removeMetaData; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; + +/** + * Cache the cluster registry data in memory and use zk watcher to update. The only exception is + * {@link #getClusterId()}, it will fetch the data from zk directly. + */ +@InterfaceAudience.Private +class ZKClusterRegistry implements ClusterRegistry { + + private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class); + + private final CuratorFramework zk; + + private final ZNodePaths znodePaths; + + private final List metaRegionLocationsWatcher; + + private volatile RegionLocations metaRegionLocations; + + private final PathChildrenCache currentNrHRSWatcher; + + private volatile int currentNrHRS; + + private final NodeCache masterAddressWatcher; + + private volatile ServerName masterAddress; + + private volatile int masterInfoPort; + + private RegionState createRegionState(int replicaId, byte[] data) throws IOException { + if (data == null || data.length == 0) { + return new RegionState( + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId), + RegionState.State.OFFLINE); + } + data = removeMetaData(data); + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.MetaRegionServer mrs = ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, + prefixLen, data.length - prefixLen); + HBaseProtos.ServerName sn = mrs.getServer(); + ServerName serverName = ServerName.valueOf(sn.getHostName(), sn.getPort(), sn.getStartCode()); + RegionState.State state; + if (mrs.hasState()) { + state = RegionState.State.convert(mrs.getState()); + } else { + state = RegionState.State.OPEN; + } + return new RegionState( + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId), + state, serverName); + } + + // All listeners of NodeCaches will be called inside the same thread so no need to lock + private void updateMetaRegionLocation(int replicaId, ChildData childData) { + if (childData == null) { + return; + } + RegionState state; + try { + state = createRegionState(replicaId, childData.getData()); + } catch (IOException e) { + LOG.warn("Failed to parse meta region state", e); + return; + } + if (state.getState() != RegionState.State.OPEN || state.getServerName() == null) { + return; + } + RegionLocations oldLocs = metaRegionLocations; + Deque newLocs = new ArrayDeque<>(); + if (oldLocs != null) { + Arrays.stream(oldLocs.getRegionLocations()) + .filter(loc -> loc.getRegionInfo().getReplicaId() != replicaId) + .forEach(loc -> newLocs.add(loc)); + } + HRegionLocation newLoc = new HRegionLocation( + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId), + state.getServerName()); + if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { + newLocs.addFirst(newLoc); + } else { + newLocs.addLast(newLoc); + } + metaRegionLocations = new RegionLocations(newLocs); + } + + private void updateCurrentNrHRS() { + currentNrHRS = currentNrHRSWatcher.getCurrentData().size(); + } + + private void updateMasterAddressAndInfoPort() { + ChildData childData = masterAddressWatcher.getCurrentData(); + if (childData == null) { + return; + } + byte[] data = childData.getData(); + if (data == null || data.length == 0) { + return; + } + data = removeMetaData(data); + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + try { + ZooKeeperProtos.Master masterProto = ZooKeeperProtos.Master.parser().parseFrom(data, + prefixLen, data.length - prefixLen); + HBaseProtos.ServerName snProto = masterProto.getMaster(); + masterAddress = ServerName.valueOf(snProto.getHostName(), snProto.getPort(), + snProto.getStartCode()); + masterInfoPort = masterProto.getInfoPort(); + } catch (IOException e) { + LOG.warn("Failed to parse master address", e); + } + } + + ZKClusterRegistry(Configuration conf) { + this.znodePaths = new ZNodePaths(conf); + int zkSessionTimeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + int zkRetry = conf.getInt("zookeeper.recovery.retry", 3); + int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); + this.zk = CuratorFrameworkFactory.builder() + .connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs)) + .threadFactory( + Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode()))) + .build(); + this.zk.start(); + + metaRegionLocationsWatcher = znodePaths.metaReplicaZNodes.entrySet().stream().map(entry -> { + NodeCache nc = new NodeCache(zk, entry.getValue()); + nc.getListenable().addListener(new NodeCacheListener() { + + @Override + public void nodeChanged() { + updateMetaRegionLocation(entry.getKey().intValue(), nc.getCurrentData()); + } + }); + return nc; + }).collect(collectingAndThen(toList(), Collections::unmodifiableList)); + + currentNrHRSWatcher = new PathChildrenCache(zk, znodePaths.rsZNode, false, + Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x-RS", hashCode()))); + currentNrHRSWatcher.getListenable().addListener(new PathChildrenCacheListener() { + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { + updateCurrentNrHRS(); + } + }); + + masterAddressWatcher = new NodeCache(zk, znodePaths.masterAddressZNode); + masterAddressWatcher.getListenable().addListener(new NodeCacheListener() { + + @Override + public void nodeChanged() { + updateMasterAddressAndInfoPort(); + } + }); + + try { + for (NodeCache watcher : metaRegionLocationsWatcher) { + watcher.start(); + } + currentNrHRSWatcher.start(); + masterAddressWatcher.start(); + } catch (Exception e) { + // normal start will not throw any exception. + throw new AssertionError(e); + } + } + + @Override + public RegionLocations getMetaRegionLocation() { + return metaRegionLocations; + } + + @Override + public String getClusterId() { + try { + byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode); + if (data == null || data.length == 0) { + return null; + } + return ClusterId.parseFrom(removeMetaData(data)).toString(); + } catch (Exception e) { + LOG.warn("failed to get cluster id", e); + return null; + } + } + + @Override + public int getCurrentNrHRS() { + return currentNrHRS; + } + + @Override + public ServerName getMasterAddress() { + return masterAddress; + } + + @Override + public int getMasterInfoPort() { + return masterInfoPort; + } + + @Override + public void close() { + metaRegionLocationsWatcher.forEach(IOUtils::closeQuietly); + IOUtils.closeQuietly(currentNrHRSWatcher); + IOUtils.closeQuietly(masterAddressWatcher); + IOUtils.closeQuietly(zk); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 371279e..11d2e75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -707,7 +707,7 @@ public class RecoverableZooKeeper { return null; } - public byte[] removeMetaData(byte[] data) { + public static byte[] removeMetaData(byte[] data) { if(data == null || data.length == 0) { return data; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index b7b9beb..5b22bd4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -104,4 +106,17 @@ public class CollectionUtils { } return list.get(list.size() - 1); } + + /** + * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the + * value already exists. So here we copy the implementation of + * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)} here. It uses get + * and putIfAbsent to implement computeIfAbsent. And notice that the implementation does not + * guarantee that the supplier will only be executed once. + */ + public static V computeIfAbsent(ConcurrentMap map, K key, Supplier supplier) { + V v, newValue; + return ((v = map.get(key)) == null && (newValue = supplier.get()) != null + && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index 15b3930..740f9ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -52,6 +52,7 @@ public class ReflectionUtils { private static T instantiate(final String className, Constructor ctor, Object[] ctorArgs) { try { + ctor.setAccessible(true); return ctor.newInstance(ctorArgs); } catch (IllegalAccessException e) { throw new UnsupportedOperationException( @@ -65,14 +66,13 @@ public class ReflectionUtils { } } - @SuppressWarnings("unchecked") public static T newInstance(Class type, Object... params) { return instantiate(type.getName(), findConstructor(type, params), params); } @SuppressWarnings("unchecked") public static Constructor findConstructor(Class type, Object... paramTypes) { - Constructor[] constructors = (Constructor[])type.getConstructors(); + Constructor[] constructors = (Constructor[]) type.getDeclaredConstructors(); for (Constructor ctor : constructors) { Class[] ctorParamTypes = ctor.getParameterTypes(); if (ctorParamTypes.length != paramTypes.length) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncSingleActionRetryCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncSingleActionRetryCaller.java new file mode 100644 index 0000000..a4abf98 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncSingleActionRetryCaller.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncSingleActionRetryCaller { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private AsyncConnectionImpl asyncConn; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() { + if (asyncConn != null) { + asyncConn.close(); + asyncConn = null; + } + } + + private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(RpcRetryingCallerFactory.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt); + conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires); + asyncConn = new AsyncConnectionImpl(conf, User.getCurrent()); + } + + @Test + public void testRegionMove() throws InterruptedException, ExecutionException, IOException { + initConn(0, 100, 30); + // This will leave a cached entry in location cache + HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); + int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName()); + TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes( + TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); + AsyncTable table = asyncConn.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + + // move back + TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(loc.getServerName().getServerName())); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + } + + private CompletableFuture failedFuture() { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Inject error!")); + return future; + } + + @Test + public void testMaxRetries() throws IOException, InterruptedException { + initConn(0, 10, 2); + try { + asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) + .action((controller, loc, stub) -> failedFuture()).call().get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); + } + } + + @Test + public void testOperationTimeout() throws IOException, InterruptedException { + initConn(0, 100, Integer.MAX_VALUE); + long startNs = System.nanoTime(); + try { + asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW) + .operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture()) + .call().get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); + } + long costNs = System.nanoTime() - startNs; + assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1)); + assertTrue(costNs < TimeUnit.SECONDS.toNanos(2)); + } + + @Test + public void testLocateError() throws IOException, InterruptedException, ExecutionException { + initConn(0, 100, 5); + AtomicBoolean errorTriggered = new AtomicBoolean(false); + AtomicInteger count = new AtomicInteger(0); + HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); + + try (RegionLocatorImpl mockedLocator = new RegionLocatorImpl(asyncConn.getConfiguration()) { + @Override + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + boolean reload) { + if (tableName.equals(TABLE_NAME)) { + CompletableFuture future = new CompletableFuture<>(); + if (count.getAndIncrement() == 0) { + errorTriggered.set(true); + future.completeExceptionally(new RuntimeException("Inject error!")); + } else { + future.complete(loc); + } + return future; + } else { + return super.getRegionLocation(tableName, row, reload); + } + } + + @Override + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, + Object exception, ServerName source) { + } + }; + AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(), + User.getCurrent()) { + + @Override + RegionLocatorImpl getLocator() { + return mockedLocator; + } + }) { + AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + assertTrue(errorTriggered.get()); + errorTriggered.set(false); + count.set(0); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + assertTrue(errorTriggered.get()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java new file mode 100644 index 0000000..2266175 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTable { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + table.delete(new Delete(ROW)).get(); + result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertTrue(result.isEmpty()); + assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); + } + + private byte[] concat(byte[] base, int index) { + return Bytes.toBytes(Bytes.toString(base) + "-" + index); + } + + @Test + public void testMultiple() throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 100; + CountDownLatch putLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach( + i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) + .thenAccept(x -> putLatch.countDown())); + putLatch.await(); + BlockingQueue existsResp = new ArrayBlockingQueue<>(count); + IntStream.range(0, count) + .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> existsResp.add(x))); + for (int i = 0; i < count; i++) { + assertTrue(existsResp.take()); + } + BlockingQueue> getResp = new ArrayBlockingQueue<>(count); + IntStream.range(0, count) + .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); + for (int i = 0; i < count; i++) { + Pair pair = getResp.take(); + assertArrayEquals(concat(VALUE, pair.getFirst()), + pair.getSecond().getValue(FAMILY, QUALIFIER)); + } + CountDownLatch deleteLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach( + i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> deleteLatch.countDown())); + deleteLatch.await(); + IntStream.range(0, count) + .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> existsResp.add(x))); + for (int i = 0; i < count; i++) { + assertFalse(existsResp.take()); + } + IntStream.range(0, count) + .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); + for (int i = 0; i < count; i++) { + Pair pair = getResp.take(); + assertTrue(pair.getSecond().isEmpty()); + } + } +} diff --git a/pom.xml b/pom.xml index 7715278..c6adcb5 100644 --- a/pom.xml +++ b/pom.xml @@ -1228,6 +1228,7 @@ 2.11.6 1.46 1.0.0-RC2 + 2.11.0 2.4 1.8 @@ -1784,28 +1785,49 @@ disruptor ${disruptor.version} - + net.spy spymemcached ${spy.version} true - - - org.bouncycastle - bcprov-jdk16 - ${bouncycastle.version} - test - - - org.apache.kerby - kerb-client - ${kerby.version} - - - org.apache.kerby - kerb-simplekdc - ${kerby.version} - + + + org.bouncycastle + bcprov-jdk16 + ${bouncycastle.version} + test + + + org.apache.kerby + kerb-client + ${kerby.version} + + + org.apache.kerby + kerb-simplekdc + ${kerby.version} + + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.curator + curator-framework + ${curator.version} + + + org.apache.curator + curator-client + ${curator.version} + + + com.google.guava + guava + + + -- 2.7.4