From c5c49013415a6bbfdc7e34061147fdff0e234bc8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 8 Oct 2016 17:27:21 +0800 Subject: [PATCH] HBASE-15921 Add first AsyncTable impl and create TableImpl based on it --- hbase-client/pom.xml | 12 + .../apache/hadoop/hbase/client/AsyncProcess.java | 17 +- .../hbase/client/ConnectionImplementation.java | 2 +- .../hbase/client/RpcRetryingCallerFactory.java | 14 +- .../hadoop/hbase/client/async/AsyncConnection.java | 42 ++++ .../hbase/client/async/AsyncConnectionImpl.java | 174 +++++++++++++ .../hbase/client/async/AsyncRegionLocator.java | 43 ++++ .../hbase/client/async/AsyncRegionLocatorImpl.java | 47 ++++ .../client/async/AsyncRpcRetryCallerFactory.java | 119 +++++++++ .../client/async/AsyncSingleActionRetryCaller.java | 226 +++++++++++++++++ .../hadoop/hbase/client/async/AsyncTable.java | 49 ++++ .../hadoop/hbase/client/async/AsyncTableImpl.java | 136 +++++++++++ .../hadoop/hbase/client/async/ClusterRegistry.java | 50 ++++ .../hbase/client/async/ClusterRegistryFactory.java | 43 ++++ .../hbase/client/async/RegionLocatorImpl.java | 66 +++++ .../hbase/client/async/ZKClusterRegistry.java | 268 +++++++++++++++++++++ .../hbase/zookeeper/RecoverableZooKeeper.java | 2 +- .../apache/hadoop/hbase/util/CollectionUtils.java | 15 ++ .../apache/hadoop/hbase/util/ReflectionUtils.java | 4 +- .../hadoop/hbase/client/async/TestAsyncTable.java | 78 ++++++ pom.xml | 58 +++-- 21 files changed, 1429 insertions(+), 36 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index f3e27bc..f61c6d0 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -203,6 +203,18 @@ mockito-all test + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f2d9546..57e9ec4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.START_LOG_ERRORS_AFTER_COUNT_KEY; + +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -55,8 +60,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.annotations.VisibleForTesting; - /** * This class allows a continuous flow of requests. It's written to be compatible with a * synchronous caller such as HTable. @@ -100,16 +103,6 @@ class AsyncProcess { public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; /** - * Configure the number of failures after which the client will start logging. A few failures - * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable - * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at - * this stage. - */ - public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = - "hbase.client.start.log.errors.counter"; - public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; - - /** * Configuration to decide whether to log details for batch error */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8db9dbf..5d3dc88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -106,7 +106,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", justification="Access to the conncurrent hash map is under a lock so should be fine.") @InterfaceAudience.Private -class ConnectionImplementation implements ClusterConnection, Closeable { +public class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Log LOG = LogFactory.getLog(ConnectionImplementation.class); private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index f92aeae..5c1d488 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -30,6 +30,16 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; + + /** + * Configure the number of failures after which the client will start logging. A few failures is + * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable + * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at this + * stage. + */ + public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = "hbase.client.start.log.errors.counter"; + public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; + protected final Configuration conf; private final long pause; private final int retries; @@ -50,8 +60,8 @@ public class RpcRetryingCallerFactory { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, - AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, + DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java new file mode 100644 index 0000000..626453a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnection.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.Closeable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of Connection. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AsyncConnection extends Closeable { + + Configuration getConfiguration(); + + AsyncRegionLocator getRegionLocator(TableName tableName); + + AsyncTable getTable(TableName tableName); + + @Override + void close(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java new file mode 100644 index 0000000..d139ba4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncConnectionImpl.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Threads; + +/** + * The implementation of AsyncConnection. + */ +@InterfaceAudience.Private +class AsyncConnectionImpl implements AsyncConnection { + + private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); + + private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); + + private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; + + private final Configuration conf; + + private final User user; + + private final ClusterRegistry registry; + + private final String clusterId; + + private final int rpcTimeout; + + private final RpcClient rpcClient; + + final RpcControllerFactory rpcControllerFactory; + + private final boolean hostnameCanChange; + + final RegionLocatorImpl locator; + + final AsyncRpcRetryCallerFactory callerFactory; + + public AsyncConnectionImpl(Configuration conf, User user) throws IOException { + this.conf = conf; + this.user = user; + + this.locator = new RegionLocatorImpl(conf); + + // action below will not throw exception so no need to catch and close. + this.registry = ClusterRegistryFactory.getRegistry(conf); + this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug("cluster id came back null, using default " + HConstants.CLUSTER_ID_DEFAULT); + } + return HConstants.CLUSTER_ID_DEFAULT; + }); + this.rpcClient = RpcClientFactory.createClient(conf, clusterId); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); + this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.callerFactory = new AsyncRpcRetryCallerFactory(this, RETRY_TIMER); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public void close() { + IOUtils.closeQuietly(locator); + IOUtils.closeQuietly(rpcClient); + IOUtils.closeQuietly(registry); + } + + @Override + public AsyncRegionLocator getRegionLocator(TableName tableName) { + return new AsyncRegionLocatorImpl(tableName, locator); + } + + void retryAfter(long delay, TimeUnit unit, Runnable action) { + RETRY_TIMER.newTimeout(new TimerTask() { + + @Override + public void run(Timeout timeout) throws Exception { + action.run(); + } + }, delay, unit); + } + + private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); + + private String getStubKey(String serviceName, ServerName serverName) { + // Sometimes, servers go down and they come back up with the same hostname but a different + // IP address. Force a resolution of the rsHostname by trying to instantiate an + // InetSocketAddress, and this way we will rightfully get a new stubKey. + // Also, include the hostname in the key so as to take care of those cases where the + // DNS name is different but IP address remains the same. + String hostname = serverName.getHostname(); + int port = serverName.getPort(); + if (hostnameCanChange) { + try { + InetAddress ip = InetAddress.getByName(hostname); + return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port; + } catch (UnknownHostException e) { + LOG.warn("Can not resolve " + hostname + ", please check your network", e); + } + } + return serviceName + "@" + hostname + ":" + port; + } + + private ClientService.Interface createRegionServerStub(ServerName serverName) { + try { + return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + try { + return CollectionUtils.computeIfAbsent(rsStubs, + getStubKey(ClientService.Interface.class.getSimpleName(), serverName), + () -> createRegionServerStub(serverName)); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + + @Override + public AsyncTable getTable(TableName tableName) { + return new AsyncTableImpl(this, tableName); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java new file mode 100644 index 0000000..9e596ce --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous version of RegionLocator. + *

+ * Declared as InterfaceAudience.Private because it is only an experimental API yet. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AsyncRegionLocator { + + TableName getName(); + + default CompletableFuture getRegionLocation(byte[] row) { + return getRegionLocation(row, false); + } + + CompletableFuture getRegionLocation(byte[] row, boolean reload); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java new file mode 100644 index 0000000..4cd209b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRegionLocatorImpl.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class AsyncRegionLocatorImpl implements AsyncRegionLocator { + + private final TableName tableName; + + private final RegionLocatorImpl locator; + + public AsyncRegionLocatorImpl(TableName tableName, RegionLocatorImpl locator) { + this.tableName = tableName; + this.locator = locator; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public CompletableFuture getRegionLocation(byte[] row, boolean reload) { + return locator.getRegionLocation(tableName, row, reload); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java new file mode 100644 index 0000000..e9432c8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncRpcRetryCallerFactory.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; +import static org.apache.hadoop.hbase.client.RpcRetryingCallerFactory.START_LOG_ERRORS_AFTER_COUNT_KEY; + +import com.google.common.base.Preconditions; + +import io.netty.util.HashedWheelTimer; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Factory to create an AsyncRpcRetryCaller. + */ +@InterfaceAudience.Private +public class AsyncRpcRetryCallerFactory { + + private final AsyncConnectionImpl conn; + + private final HashedWheelTimer retryTimer; + + private final long pauseNs; + + private final int maxRetries; + + /** How many retries are allowed before we start to log */ + private final int startLogErrorsCnt; + + public AsyncRpcRetryCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { + this.conn = conn; + Configuration conf = conn.getConfiguration(); + this.pauseNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, + DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.retryTimer = retryTimer; + } + + public class SingleActionCallerBuilder { + + private TableName tableName; + + private byte[] row; + + private AsyncSingleActionRetryCaller.Callable callable; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public SingleActionCallerBuilder table(TableName tableName) { + this.tableName = tableName; + return this; + } + + public SingleActionCallerBuilder row(byte[] row) { + this.row = row; + return this; + } + + public SingleActionCallerBuilder action(AsyncSingleActionRetryCaller.Callable callable) { + this.callable = callable; + return this; + } + + public SingleActionCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public SingleActionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncSingleActionRetryCaller build() { + return new AsyncSingleActionRetryCaller<>(retryTimer, conn, + Preconditions.checkNotNull(tableName, "tableName is null"), + Preconditions.checkNotNull(row, "row is null"), + Preconditions.checkNotNull(callable, "action is null"), pauseNs, maxRetries, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + } + + public CompletableFuture call() { + return build().call(); + } + } + + public SingleActionCallerBuilder single() { + return new SingleActionCallerBuilder<>(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java new file mode 100644 index 0000000..07f4814 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncSingleActionRetryCaller.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Retry caller for a single action, such as get, put, delete, etc. + */ +@InterfaceAudience.Private +public class AsyncSingleActionRetryCaller { + + private static final Log LOG = LogFactory.getLog(AsyncSingleActionRetryCaller.class); + + @FunctionalInterface + public interface Callable { + CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, + ClientService.Interface stub); + } + + private final HashedWheelTimer retryTimer; + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final byte[] row; + + private final Callable callable; + + private final long pauseNs; + + private final int maxAttempts; + + private final long operationTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final CompletableFuture future; + + private final HBaseRpcController controller; + + private final List exceptions; + + private final long startNs; + + public AsyncSingleActionRetryCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + TableName tableName, byte[] row, Callable callable, long pauseNs, int maxRetries, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.tableName = tableName; + this.row = row; + this.callable = callable; + this.pauseNs = pauseNs; + this.maxAttempts = maxRetries + 1; + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + this.startNs = System.nanoTime(); + } + + private int tries = 1; + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + } + + private static Throwable translateException(Throwable t) { + if (t instanceof UndeclaredThrowableException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof ServiceException && t.getCause() != null) { + t = translateException(t.getCause()); + } + return t; + } + + private void completeExceptionally() { + future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + } + + private void onError(Throwable error, Supplier errMsg, + Consumer updateCachedLocation, Runnable retry) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn(errMsg.get(), error); + } + RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext( + error, EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + completeExceptionally(); + return; + } + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs); + if (maxDelayNs <= 0) { + completeExceptionally(); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + updateCachedLocation.accept(error); + tries++; + retryTimer.newTimeout(new TimerTask() { + + @Override + public void run(Timeout timeout) throws Exception { + retry.run(); + } + }, delayNs, TimeUnit.NANOSECONDS); + } + + private void resetController() { + controller.reset(); + if (rpcTimeoutNs >= 0) { + controller.setCallTimeout( + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs))); + } + } + + private void call(HRegionLocation loc) { + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(loc.getServerName()); + } catch (IOException e) { + onError(e, + () -> "Get async stub to " + loc.getServerName() + " for " + Bytes.toStringBinary(row) + + " in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeoutNs = " + + operationTimeoutNs + ", time elapsed = " + elapsedMs() + " ms", + err -> { + }, this::locate); + return; + } + resetController(); + callable.call(controller, loc, stub).whenComplete((result, error) -> { + if (error != null) { + onError(error, + () -> "Call to " + loc.getServerName() + " for " + Bytes.toStringBinary(row) + " in " + + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = " + + tries + ", maxAttempts = " + maxAttempts + ", timeoutNs = " + operationTimeoutNs + + ", time elapsed = " + elapsedMs() + " ms", + err -> conn.locator.updateCachedLocations(tableName, loc.getRegionInfo().getRegionName(), + row, err, loc.getServerName()), + this::locate); + return; + } + future.complete(result); + }); + } + + private void locate() { + conn.locator.getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> { + if (error != null) { + onError(error, + () -> "Locate " + Bytes.toStringBinary(row) + " in " + tableName + " failed, tries = " + + tries + ", maxAttempts = " + maxAttempts + ", timeoutNs = " + operationTimeoutNs + + ", time elapsed = " + elapsedMs() + " ms", + err -> { + }, this::locate); + return; + } + call(loc); + }); + } + + public CompletableFuture call() { + locate(); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java new file mode 100644 index 0000000..5d08c18 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTable.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; + +/** + * The asynchronous version of Table. + *

+ * Declared as InterfaceAudience.Private because it is only an experimental API yet. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface AsyncTable { + + TableName getName(); + + Configuration getConfiguration(); + + CompletableFuture get(Get get); + + CompletableFuture put(Put put); + + CompletableFuture delete(Delete delete); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java new file mode 100644 index 0000000..5b0261d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/AsyncTableImpl.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; + +/** + * The implementation of AsyncTable. + */ +@InterfaceAudience.Private +public class AsyncTableImpl implements AsyncTable { + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { + this.conn = conn; + this.tableName = tableName; + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return conn.getConfiguration(); + } + + @FunctionalInterface + private interface Converter { + D convert(I info, S src) throws IOException; + } + + @FunctionalInterface + private interface RpcCall { + void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback done); + } + + private CompletableFuture call(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, RpcCall rpcCall, + Converter respConverter) { + CompletableFuture future = new CompletableFuture<>(); + try { + rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), + new RpcCallback() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(respConverter.convert(controller, resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public CompletableFuture get(Get get) { + return conn.callerFactory. single().table(tableName).row(get.getRow()) + .action((controller, loc, stub) -> this. call( + controller, loc, stub, get, RequestConverter::buildGetRequest, + (s, c, req, done) -> s.get(c, req, done), + (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) + .call(); + } + + @Override + public CompletableFuture put(Put put) { + return conn.callerFactory. single().table(tableName).row(put.getRow()) + .action((controller, loc, stub) -> this. call( + controller, loc, stub, put, RequestConverter::buildMutateRequest, + (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { + return null; + })) + .call(); + } + + @Override + public CompletableFuture delete(Delete delete) { + return conn.callerFactory. single().table(tableName).row(delete.getRow()) + .action((controller, loc, stub) -> this. call( + controller, loc, stub, delete, RequestConverter::buildMutateRequest, + (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { + return null; + })) + .call(); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java new file mode 100644 index 0000000..996ddd8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistry.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.Closeable; + +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc. + * Internal use only. + */ +@InterfaceAudience.Private +interface ClusterRegistry extends Closeable { + + RegionLocations getMetaRegionLocation(); + + /** + * Should only be called once. + *

+ * The upper layer should store this value somewhere as it will not be change any more. + */ + String getClusterId(); + + int getCurrentNrHRS(); + + ServerName getMasterAddress(); + + int getMasterInfoPort(); + + @Override + void close(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java new file mode 100644 index 0000000..fa3be24 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ClusterRegistryFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Get instance of configured Registry. + */ +@InterfaceAudience.Private +final class ClusterRegistryFactory { + + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + + private ClusterRegistryFactory() { + } + + /** + * @return The cluster registry implementation to use. + */ + static ClusterRegistry getRegistry(Configuration conf) { + Class clazz = + conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, ClusterRegistry.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java new file mode 100644 index 0000000..4911b4b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/RegionLocatorImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionImplementation; + +/** + * TODO: reimplement using aync connection when the scan logic is ready. The current implementation + * is based on the blocking client. + */ +@InterfaceAudience.Private +class RegionLocatorImpl implements Closeable { + + private final ConnectionImplementation conn; + + RegionLocatorImpl(Configuration conf) throws IOException { + conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf); + } + + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + boolean reload) { + CompletableFuture future = new CompletableFuture<>(); + try { + future.complete(conn.getRegionLocation(tableName, row, reload)); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, + ServerName source) { + conn.updateCachedLocations(tableName, regionName, row, exception, source); + } + + @Override + public void close() { + IOUtils.closeQuietly(conn); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java new file mode 100644 index 0000000..20738dd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/async/ZKClusterRegistry.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.removeMetaData; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; + +/** + * Cache the cluster registry data in memory and use zk watcher to update. The only exception is + * {@link #getClusterId()}, it will fetch the data from zk directly. + */ +@InterfaceAudience.Private +class ZKClusterRegistry implements ClusterRegistry { + + private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class); + + private final CuratorFramework zk; + + private final ZNodePaths znodePaths; + + private final List metaRegionLocationsWatcher; + + private volatile RegionLocations metaRegionLocations; + + private final PathChildrenCache currentNrHRSWatcher; + + private volatile int currentNrHRS; + + private final NodeCache masterAddressWatcher; + + private volatile ServerName masterAddress; + + private volatile int masterInfoPort; + + private RegionState createRegionState(int replicaId, byte[] data) throws IOException { + if (data == null || data.length == 0) { + return new RegionState( + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId), + RegionState.State.OFFLINE); + } + data = removeMetaData(data); + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.MetaRegionServer mrs = ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, + prefixLen, data.length - prefixLen); + HBaseProtos.ServerName sn = mrs.getServer(); + ServerName serverName = ServerName.valueOf(sn.getHostName(), sn.getPort(), sn.getStartCode()); + RegionState.State state; + if (mrs.hasState()) { + state = RegionState.State.convert(mrs.getState()); + } else { + state = RegionState.State.OPEN; + } + return new RegionState( + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId), + state, serverName); + } + + // All listeners of NodeCaches will be called inside the same thread so no need to lock + private void updateMetaRegionLocation(int replicaId, ChildData childData) { + if (childData == null) { + return; + } + RegionState state; + try { + state = createRegionState(replicaId, childData.getData()); + } catch (IOException e) { + LOG.warn("Failed to parse meta region state", e); + return; + } + if (state.getState() != RegionState.State.OPEN || state.getServerName() == null) { + return; + } + RegionLocations oldLocs = metaRegionLocations; + Deque newLocs = new ArrayDeque<>(); + if (oldLocs != null) { + Arrays.stream(oldLocs.getRegionLocations()) + .filter(loc -> loc.getRegionInfo().getReplicaId() != replicaId) + .forEach(loc -> newLocs.add(loc)); + } + HRegionLocation newLoc = new HRegionLocation( + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId), + state.getServerName()); + if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { + newLocs.addFirst(newLoc); + } else { + newLocs.addLast(newLoc); + } + metaRegionLocations = new RegionLocations(newLocs); + } + + private void updateCurrentNrHRS() { + currentNrHRS = currentNrHRSWatcher.getCurrentData().size(); + } + + private void updateMasterAddressAndInfoPort() { + ChildData childData = masterAddressWatcher.getCurrentData(); + if (childData == null) { + return; + } + byte[] data = childData.getData(); + if (data == null || data.length == 0) { + return; + } + data = removeMetaData(data); + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + try { + ZooKeeperProtos.Master masterProto = + ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen); + HBaseProtos.ServerName snProto = masterProto.getMaster(); + masterAddress = + ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()); + masterInfoPort = masterProto.getInfoPort(); + } catch (IOException e) { + LOG.warn("Failed to parse master address", e); + } + } + + ZKClusterRegistry(Configuration conf) { + this.znodePaths = new ZNodePaths(conf); + int zkSessionTimeout = + conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + int zkRetry = conf.getInt("zookeeper.recovery.retry", 3); + int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); + this.zk = CuratorFrameworkFactory.builder() + .connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs)) + .threadFactory( + Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode()))) + .build(); + this.zk.start(); + + metaRegionLocationsWatcher = znodePaths.metaReplicaZNodes.entrySet().stream().map(entry -> { + NodeCache nc = new NodeCache(zk, entry.getValue()); + nc.getListenable().addListener(new NodeCacheListener() { + + @Override + public void nodeChanged() { + updateMetaRegionLocation(entry.getKey().intValue(), nc.getCurrentData()); + } + }); + return nc; + }).collect(collectingAndThen(toList(), Collections::unmodifiableList)); + + currentNrHRSWatcher = new PathChildrenCache(zk, znodePaths.rsZNode, false, + Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x-RS", hashCode()))); + currentNrHRSWatcher.getListenable().addListener(new PathChildrenCacheListener() { + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { + updateCurrentNrHRS(); + } + }); + + masterAddressWatcher = new NodeCache(zk, znodePaths.masterAddressZNode); + masterAddressWatcher.getListenable().addListener(new NodeCacheListener() { + + @Override + public void nodeChanged() { + updateMasterAddressAndInfoPort(); + } + }); + + try { + for (NodeCache watcher : metaRegionLocationsWatcher) { + watcher.start(); + } + currentNrHRSWatcher.start(); + masterAddressWatcher.start(); + } catch (Exception e) { + // normal start will not throw any exception. + throw new AssertionError(e); + } + } + + @Override + public RegionLocations getMetaRegionLocation() { + return metaRegionLocations; + } + + @Override + public String getClusterId() { + try { + byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode); + if (data == null || data.length == 0) { + return null; + } + return ClusterId.parseFrom(removeMetaData(data)).toString(); + } catch (Exception e) { + LOG.warn("failed to get cluster id", e); + return null; + } + } + + @Override + public int getCurrentNrHRS() { + return currentNrHRS; + } + + @Override + public ServerName getMasterAddress() { + return masterAddress; + } + + @Override + public int getMasterInfoPort() { + return masterInfoPort; + } + + @Override + public void close() { + metaRegionLocationsWatcher.forEach(IOUtils::closeQuietly); + IOUtils.closeQuietly(currentNrHRSWatcher); + IOUtils.closeQuietly(masterAddressWatcher); + IOUtils.closeQuietly(zk); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 371279e..11d2e75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -707,7 +707,7 @@ public class RecoverableZooKeeper { return null; } - public byte[] removeMetaData(byte[] data) { + public static byte[] removeMetaData(byte[] data) { if(data == null || data.length == 0) { return data; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index b7b9beb..5b22bd4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -104,4 +106,17 @@ public class CollectionUtils { } return list.get(list.size() - 1); } + + /** + * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the + * value already exists. So here we copy the implementation of + * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)} here. It uses get + * and putIfAbsent to implement computeIfAbsent. And notice that the implementation does not + * guarantee that the supplier will only be executed once. + */ + public static V computeIfAbsent(ConcurrentMap map, K key, Supplier supplier) { + V v, newValue; + return ((v = map.get(key)) == null && (newValue = supplier.get()) != null + && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index 15b3930..740f9ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -52,6 +52,7 @@ public class ReflectionUtils { private static T instantiate(final String className, Constructor ctor, Object[] ctorArgs) { try { + ctor.setAccessible(true); return ctor.newInstance(ctorArgs); } catch (IllegalAccessException e) { throw new UnsupportedOperationException( @@ -65,14 +66,13 @@ public class ReflectionUtils { } } - @SuppressWarnings("unchecked") public static T newInstance(Class type, Object... params) { return instantiate(type.getName(), findConstructor(type, params), params); } @SuppressWarnings("unchecked") public static Constructor findConstructor(Class type, Object... paramTypes) { - Constructor[] constructors = (Constructor[])type.getConstructors(); + Constructor[] constructors = (Constructor[]) type.getDeclaredConstructors(); for (Constructor ctor : constructors) { Class[] ctorParamTypes = ctor.getParameterTypes(); if (ctorParamTypes.length != paramTypes.length) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java new file mode 100644 index 0000000..23e9a00 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/async/TestAsyncTable.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.async; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTable { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + table.delete(new Delete(ROW)).get(); + result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + assertTrue(result.isEmpty()); + } +} diff --git a/pom.xml b/pom.xml index 7715278..c6adcb5 100644 --- a/pom.xml +++ b/pom.xml @@ -1228,6 +1228,7 @@ 2.11.6 1.46 1.0.0-RC2 + 2.11.0 2.4 1.8 @@ -1784,28 +1785,49 @@ disruptor ${disruptor.version} - + net.spy spymemcached ${spy.version} true - - - org.bouncycastle - bcprov-jdk16 - ${bouncycastle.version} - test - - - org.apache.kerby - kerb-client - ${kerby.version} - - - org.apache.kerby - kerb-simplekdc - ${kerby.version} - + + + org.bouncycastle + bcprov-jdk16 + ${bouncycastle.version} + test + + + org.apache.kerby + kerb-client + ${kerby.version} + + + org.apache.kerby + kerb-simplekdc + ${kerby.version} + + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.curator + curator-framework + ${curator.version} + + + org.apache.curator + curator-client + ${curator.version} + + + com.google.guava + guava + + + -- 2.7.4