From bbd878e0330627ad49e9cadb02dbc3431b5ec317 Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Wed, 27 May 2015 13:18:28 +0200 Subject: [PATCH] HBASE-13784 --- .../hbase/client/AbstractRegionServerCallable.java | 157 +++++ .../hbase/client/AbstractRetryingCallable.java | 61 ++ .../hbase/client/AsyncRegionServerCallable.java | 72 ++ .../hadoop/hbase/client/AsyncRetryingCallable.java | 44 ++ .../hadoop/hbase/client/ClusterConnection.java | 13 + .../hadoop/hbase/client/ConnectionAdapter.java | 12 + .../hbase/client/ConnectionImplementation.java | 38 ++ .../hadoop/hbase/client/FailedResponsePromise.java | 108 +++ .../hbase/client/FastFailInterceptorContext.java | 2 +- .../org/apache/hadoop/hbase/client/HTable.java | 60 ++ .../client/NoOpRetryingInterceptorContext.java | 2 +- .../hadoop/hbase/client/RegionServerCallable.java | 116 +--- .../hadoop/hbase/client/ResponsePromise.java | 69 ++ .../hadoop/hbase/client/RetryingCallable.java | 32 +- .../client/RetryingCallerInterceptorContext.java | 2 +- .../hbase/client/RetryingResponsePromise.java | 99 +++ .../hadoop/hbase/client/RpcRetryingCaller.java | 22 +- .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 105 ++- .../client/RpcRetryingCallerWithReadReplicas.java | 97 +++ .../client/StatsTrackingRpcRetryingCaller.java | 32 +- .../java/org/apache/hadoop/hbase/client/Table.java | 11 + .../hadoop/hbase/ipc/AbstractResponsePromise.java | 40 ++ .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 54 +- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 726 +------------------- .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java | 758 +++++++++++++++++++++ .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 112 ++- .../hbase/ipc/AsyncServerResponseHandler.java | 4 +- .../apache/hadoop/hbase/ipc/MessageConverter.java | 38 ++ .../org/apache/hadoop/hbase/ipc/RpcClient.java | 19 + .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 14 + .../hadoop/hbase/rest/client/RemoteHTable.java | 7 + .../apache/hadoop/hbase/client/HTableWrapper.java | 5 + .../hbase/client/TestFromClientSideAsync.java | 123 ++++ 33 files changed, 2165 insertions(+), 889 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedResponsePromise.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponsePromise.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingResponsePromise.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractResponsePromise.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java new file mode 100644 index 0000000..fb9b62a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java @@ -0,0 +1,157 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; + +/** + * Implementations call a RegionServer. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. + * @param the class that the ServerCallable handles + */ +@InterfaceAudience.Private +abstract class AbstractRegionServerCallable implements AbstractRetryingCallable { + protected final Connection connection; + protected final TableName tableName; + protected final byte[] row; + protected HRegionLocation location; + + protected final static int MIN_WAIT_DEAD_SERVER = 10000; + + /** + * @param connection Connection to use. + * @param tableName Table name to which row belongs. + * @param row The row we want in tableName. + */ + public AbstractRegionServerCallable(Connection connection, TableName tableName, byte[] row) { + this.connection = connection; + this.tableName = tableName; + this.row = row; + } + + /** + * @return {@link ClusterConnection} instance used by this Callable. + */ + ClusterConnection getConnection() { + return (ClusterConnection) this.connection; + } + + protected HRegionLocation getLocation() { + return this.location; + } + + protected void setLocation(final HRegionLocation location) { + this.location = location; + } + + public TableName getTableName() { + return this.tableName; + } + + public byte [] getRow() { + return this.row; + } + + @Override + public void throwable(Throwable t, boolean retrying) { + if (t instanceof SocketTimeoutException || + t instanceof ConnectException || + t instanceof RetriesExhaustedException || + (location != null && getConnection().isDeadServer(location.getServerName()))) { + // if thrown these exceptions, we clear all the cache entries that + // map to that slow/dead server; otherwise, let cache miss and ask + // hbase:meta again to find the new location + if (this.location != null) getConnection().clearCaches(location.getServerName()); + } else if (t instanceof RegionMovedException) { + getConnection().updateCachedLocations(tableName, row, t, location); + } else if (t instanceof NotServingRegionException && !retrying) { + // Purge cache entries for this specific region from hbase:meta cache + // since we don't call connect(true) when number of retries is 1. + getConnection().deleteCachedRegionLocation(location); + } + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location; + } + + @Override + public long sleep(long pause, int tries) { + // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + long sleep = ConnectionUtils.getPauseTime(pause, tries + 1); + if (sleep < MIN_WAIT_DEAD_SERVER + && (location == null || getConnection().isDeadServer(location.getServerName()))) { + sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); + } + return sleep; + } + + /** + * @return the HRegionInfo for the current region + */ + public HRegionInfo getHRegionInfo() { + if (this.location == null) { + return null; + } + return this.location.getRegionInfo(); + } + + /** + * Prepare for connection to the server hosting region with row from tablename. Does lookup + * to find region location and hosting server. + * @param reload Set this to true if connection should re-find the region + * @throws IOException e + */ + @Override + public void prepare(final boolean reload) throws IOException { + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + this.location = regionLocator.getRegionLocation(row, reload); + } + if (this.location == null) { + throw new IOException("Failed to find location, tableName=" + tableName + + ", row=" + Bytes.toString(row) + ", reload=" + reload); + } + setClientByServiceName(this.location.getServerName()); + } + + /** + * Set the Rpc client for Client services + * @param serviceName to get client for + * @throws IOException When client could not be created + */ + abstract void setClientByServiceName(ServerName serviceName) throws IOException; +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallable.java new file mode 100644 index 0000000..736b259 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallable.java @@ -0,0 +1,61 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * A abstract Callable that can be retried. + * @param + */ +@InterfaceAudience.Private +public interface AbstractRetryingCallable { + /** + * Prepare by setting up any connections to servers, etc., ahead of call invocation. + * @param reload Set this to true if need to requery locations + * @throws IOException e + */ + void prepare(final boolean reload) throws IOException; + + /** + * Called when call throws an exception and we are going to retry; take action to + * make it so we succeed on next call (clear caches, do relookup of locations, etc.). + * @param t + * @param retrying True if we are in retrying mode (we are not in retrying mode when max + * retries == 1; we ARE in retrying mode if retries > 1 even when we are the last attempt) + */ + void throwable(final Throwable t, boolean retrying); + + /** + * @return Some details from the implementation that we would like to add to a terminating + * exception; i.e. a fatal exception is being thrown ending retries and we might like to add + * more implementation-specific detail on to the exception being thrown. + */ + String getExceptionMessageAdditionalDetail(); + + /** + * @param pause + * @param tries + * @return Suggestion on how much to sleep between retries + */ + long sleep(final long pause, final int tries); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java new file mode 100644 index 0000000..e921119 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerCallable.java @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import io.netty.channel.EventLoop; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; + +import java.io.IOException; + +/** + * Implementations call a RegionServer and implement {@link #call(int)}. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. + * @param the class that the ServerCallable handles + */ +@InterfaceAudience.Private +public abstract class AsyncRegionServerCallable extends AbstractRegionServerCallable + implements AsyncRetryingCallable { + private AsyncRpcChannel channel; + // Public because used outside of this package over in ipc. + + /** + * @param connection Connection to use. + * @param tableName Table name to which row belongs. + * @param row The row we want in tableName. + */ + public AsyncRegionServerCallable(Connection connection, TableName tableName, byte[] row) { + super(connection, tableName, row); + } + + @Override + void setClientByServiceName(ServerName service) throws IOException { + this.channel = getConnection().getAsyncClientChannel(service); + } + + /** + * Get the Async RPC channel for this Callable + * @return AsyncRpcChannel + */ + public AsyncRpcChannel getChannel() { + return channel; + } + + @Override + public EventLoop getEventLoop() { + return ((ClusterConnection)this.connection).getEventLoop(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java new file mode 100644 index 0000000..6bb3f9d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRetryingCallable.java @@ -0,0 +1,44 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import io.netty.channel.EventLoop; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A Callable that will be retried async. + * @param + */ +@InterfaceAudience.Private +public interface AsyncRetryingCallable extends AbstractRetryingCallable { + /** + * Computes a result, or throws an exception if unable to do so. + * + * @param callTimeout - the time available for this call. 0 for infinite. + * @return Future which handles the Result + */ + ResponsePromise call(int callTimeout); + + /** + * Get EventLoop to operate async operations on + * @return AsyncRpcChannel + */ + EventLoop getEventLoop(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 07b055a..846c9eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.List; +import io.netty.channel.EventLoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MasterNotRunningException; @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; @@ -228,6 +230,15 @@ public interface ClusterConnection extends HConnection { ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** + * Get an Async RPC channel for Client to communicate over + * + * @param serverName to connect to + * @return RpcChannel to communicate with server + * @throws IOException if a remote or network exception occurs + */ + AsyncRpcChannel getAsyncClientChannel(final ServerName serverName) throws IOException; + + /** * Find region location hosting passed row * @param tableName table name * @param row Row to find. @@ -296,4 +307,6 @@ public interface ClusterConnection extends HConnection { * @return the configured client backoff policy */ ClientBackoffPolicy getBackoffPolicy(); + + EventLoop getEventLoop(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 1d8a793..c34e6c3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; +import io.netty.channel.EventLoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; @@ -337,6 +339,11 @@ abstract class ConnectionAdapter implements ClusterConnection { } @Override + public AsyncRpcChannel getAsyncClientChannel(ServerName serverName) throws IOException { + return wrappedConnection.getAsyncClientChannel(serverName); + } + + @Override public AdminService.BlockingInterface getAdmin( ServerName serverName, boolean getMaster) throws IOException { return wrappedConnection.getAdmin(serverName, getMaster); @@ -464,4 +471,9 @@ abstract class ConnectionAdapter implements ClusterConnection { public ClientBackoffPolicy getBackoffPolicy() { return wrappedConnection.getBackoffPolicy(); } + + @Override + public EventLoop getEventLoop() { + return wrappedConnection.getEventLoop(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 35ff34f..328bd88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -23,6 +23,7 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import io.netty.channel.EventLoop; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -46,6 +47,8 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannel; +import org.apache.hadoop.hbase.ipc.AsyncRpcChannelImpl; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -1308,6 +1311,36 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return stub; } + @Override + public AsyncRpcChannel getAsyncClientChannel(final ServerName sn) throws IOException { + return this.getAsyncRpcChannel(ClientProtos.ClientService.getDescriptor().getName(),sn); + } + + /** + * Get an async rpc channel + * @param serviceName to get channel for + * @param serverName + * @return AsyncRpcChannel to communicate with + * @throws IOException + */ + private AsyncRpcChannel getAsyncRpcChannel(String serviceName, final ServerName serverName) + throws IOException { + if (isDeadServer(serverName)) { + throw new RegionServerStoppedException(serverName + " is dead."); + } + String key = getStubKey(AsyncRpcChannel.class.getName(), serverName.getHostname(), serverName.getPort()); + this.connectionLock.putIfAbsent(key, key); + AsyncRpcChannel channel; + synchronized (this.connectionLock.get(key)) { + channel = (AsyncRpcChannel)this.stubs.get(key); + if (channel == null) { + channel = this.rpcClient.createRpcChannel(serviceName, serverName, user); + this.stubs.put(key, channel); + } + } + return channel; + } + static String getStubKey(final String serviceName, final String rsHostname, int port) { // Sometimes, servers go down and they come back up with the same hostname but a different // IP address. Force a resolution of the rsHostname by trying to instantiate an @@ -1965,6 +1998,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.backoffPolicy; } + @Override + public EventLoop getEventLoop() { + return this.rpcClient.getEventLoop(); + } + /* * Return the number of cached region for a table. It will only be called * from a unit test. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedResponsePromise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedResponsePromise.java new file mode 100644 index 0000000..36cc5f3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailedResponsePromise.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.CompleteFuture; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A Failed Response future + * @param Value type for Future + */ +@InterfaceAudience.Private +public final class FailedResponsePromise extends CompleteFuture implements + ResponsePromise { + + private final Throwable cause; + + /** + * Creates a new instance. + * + * @param executor the {@link EventExecutor} associated with this future + * @param cause the cause of failure + */ + public FailedResponsePromise(EventExecutor executor, Throwable cause) { + super(executor); + if (cause == null) { + throw new NullPointerException("cause"); + } + this.cause = cause; + } + + @Override + public Throwable cause() { + return cause; + } + + @Override + public boolean isSuccess() { + return false; + } + + @Override + public Future sync() { + PlatformDependent.throwException(cause); + return this; + } + + @Override + public Future syncUninterruptibly() { + PlatformDependent.throwException(cause); + return this; + } + + @Override + public V getNow() { + return null; + } + + @Override + public ResponsePromise addListener(ResponseFutureListener listener) { + super.addListener(listener); + return this; + } + + @SafeVarargs + @Override + public final ResponsePromise addListeners(ResponseFutureListener... listeners) { + super.addListeners(listeners); + return this; + } + + @Override + public ResponsePromise removeListener(ResponseFutureListener listener) { + super.removeListener(listener); + return this; + } + + @SafeVarargs + @Override + public final ResponsePromise removeListeners(ResponseFutureListener... listeners) { + super.removeListeners(listeners); + return this; + } + + @Override + public CellScanner cellScanner() { + return null; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java index 9eb56bc..bd61de6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java @@ -111,7 +111,7 @@ class FastFailInterceptorContext extends return prepare(callable, 0); } - public FastFailInterceptorContext prepare(RetryingCallable callable, + public FastFailInterceptorContext prepare(AbstractRetryingCallable callable, int tries) { if (callable instanceof RegionServerCallable) { RegionServerCallable retryingCallable = (RegionServerCallable) callable; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 6ba0b87..c00e64e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.MessageConverter; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -718,6 +719,65 @@ public class HTable implements HTableInterface { return callable.call(); } + private static MessageConverter GET_RESPONSE_CONVERTER = + new MessageConverter() { + @Override + public Result convert(ClientProtos.GetResponse msg) { + if(msg == null){ + return null; + }else{ + return ProtobufUtil.toResult(msg.getResult()); + } + } + }; + + /** + * {@inheritDoc} + */ + public ResponsePromise asyncGet(final Get get) { + if (get.getConsistency() == null){ + get.setConsistency(defaultConsistency); + } + + if (get.getConsistency() == Consistency.STRONG) { + final AsyncRegionServerCallable callable = new AsyncRegionServerCallable( + this.connection, getName(), get.getRow()) { + @Override + public ResponsePromise call(int callTimeout) { + ClientProtos.GetRequest request; + try { + request = RequestConverter + .buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); + } catch (IOException e) { + return new FailedResponsePromise<>(getEventLoop(),e); + } + + return getChannel().callMethod( + ClientProtos.ClientService.getDescriptor().getMethods().get(0), + request, + null, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse.getDefaultInstance(), + GET_RESPONSE_CONVERTER, + callTimeout, + getPriority(tableName)); + } + }; + return rpcCallerFactory.newCaller().callAsyncWithRetries(callable, + this.operationTimeout); + } + + // Call that takes into account the replica + RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( + rpcControllerFactory, tableName, this.connection, get, pool, + tableConfiguration.getRetriesNumber(), + operationTimeout, + tableConfiguration.getPrimaryCallTimeoutMicroSecond()); + return callable.callAsync(); + } + + private int getPriority(TableName tn) { + return (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS; + } /** * {@inheritDoc} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java index 1ccf43c..983fa68 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java @@ -36,7 +36,7 @@ class NoOpRetryingInterceptorContext extends RetryingCallerInterceptorContext { @Override public RetryingCallerInterceptorContext prepare( - RetryingCallable callable, int tries) { + AbstractRetryingCallable callable, int tries) { // Do Nothing return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 9989d56..fc7282c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -20,19 +20,10 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.util.Bytes; /** * Implementations call a RegionServer and implement {@link #call(int)}. @@ -45,16 +36,11 @@ import org.apache.hadoop.hbase.util.Bytes; * @param the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class RegionServerCallable implements RetryingCallable { +public abstract class RegionServerCallable extends AbstractRegionServerCallable + implements RetryingCallable { // Public because used outside of this package over in ipc. - private static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - protected final Connection connection; - protected final TableName tableName; - protected final byte[] row; - protected HRegionLocation location; - private ClientService.BlockingInterface stub; - protected final static int MIN_WAIT_DEAD_SERVER = 10000; + private ClientService.BlockingInterface stub; /** * @param connection Connection to use. @@ -62,102 +48,26 @@ public abstract class RegionServerCallable implements RetryingCallable { * @param row The row we want in tableName. */ public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { - this.connection = connection; - this.tableName = tableName; - this.row = row; + super(connection, tableName, row); } - /** - * Prepare for connection to the server hosting region with row from tablename. Does lookup - * to find region location and hosting server. - * @param reload Set this to true if connection should re-find the region - * @throws IOException e - */ @Override - public void prepare(final boolean reload) throws IOException { - try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { - this.location = regionLocator.getRegionLocation(row, reload); - } - if (this.location == null) { - throw new IOException("Failed to find location, tableName=" + tableName + - ", row=" + Bytes.toString(row) + ", reload=" + reload); - } - setStub(getConnection().getClient(this.location.getServerName())); + void setClientByServiceName(ServerName service) throws IOException { + this.setStub(getConnection().getClient(service)); } /** - * @return {@link HConnection} instance used by this Callable. + * @return Client Rpc protobuf communication stub */ - HConnection getConnection() { - return (HConnection) this.connection; - } - protected ClientService.BlockingInterface getStub() { return this.stub; } - void setStub(final ClientService.BlockingInterface stub) { - this.stub = stub; - } - - protected HRegionLocation getLocation() { - return this.location; - } - - protected void setLocation(final HRegionLocation location) { - this.location = location; - } - - public TableName getTableName() { - return this.tableName; - } - - public byte [] getRow() { - return this.row; - } - - @Override - public void throwable(Throwable t, boolean retrying) { - if (t instanceof SocketTimeoutException || - t instanceof ConnectException || - t instanceof RetriesExhaustedException || - (location != null && getConnection().isDeadServer(location.getServerName()))) { - // if thrown these exceptions, we clear all the cache entries that - // map to that slow/dead server; otherwise, let cache miss and ask - // hbase:meta again to find the new location - if (this.location != null) getConnection().clearCaches(location.getServerName()); - } else if (t instanceof RegionMovedException) { - getConnection().updateCachedLocations(tableName, row, t, location); - } else if (t instanceof NotServingRegionException && !retrying) { - // Purge cache entries for this specific region from hbase:meta cache - // since we don't call connect(true) when number of retries is 1. - getConnection().deleteCachedRegionLocation(location); - } - } - - @Override - public String getExceptionMessageAdditionalDetail() { - return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location; - } - - @Override - public long sleep(long pause, int tries) { - // Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time - long sleep = ConnectionUtils.getPauseTime(pause, tries + 1); - if (sleep < MIN_WAIT_DEAD_SERVER - && (location == null || getConnection().isDeadServer(location.getServerName()))) { - sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); - } - return sleep; - } - /** - * @return the HRegionInfo for the current region + * Set the client protobuf communication stub + * @param stub to set */ - public HRegionInfo getHRegionInfo() { - if (this.location == null) { - return null; - } - return this.location.getRegionInfo(); + void setStub(final ClientService.BlockingInterface stub) { + this.stub = stub; } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponsePromise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponsePromise.java new file mode 100644 index 0000000..2797826 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponsePromise.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Promise for responses + * @param Value type + */ +@InterfaceAudience.Public +public interface ResponsePromise extends Future { + /** + * Get the CellScanner which is returned by the call + * @return CellScanner + */ + CellScanner cellScanner(); + + @Override + ResponsePromise addListener(GenericFutureListener> listener); + + @Override + ResponsePromise addListeners(GenericFutureListener>... listeners); + + @Override + ResponsePromise removeListener(GenericFutureListener> listener); + + @Override + ResponsePromise removeListeners(GenericFutureListener>... listeners); + + @Override + ResponsePromise await() throws InterruptedException; + + @Override + ResponsePromise awaitUninterruptibly(); + + @Override + ResponsePromise sync() throws InterruptedException; + + @Override + ResponsePromise syncUninterruptibly(); + + /** + * Specific interface for the Response future listener + * @param Value type. + */ + interface ResponseFutureListener + extends GenericFutureListener> { + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java index e468d3c..c9b8447 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java @@ -29,23 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * @param */ @InterfaceAudience.Private -public interface RetryingCallable { - /** - * Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation. - * @param reload Set this to true if need to requery locations - * @throws IOException e - */ - void prepare(final boolean reload) throws IOException; - - /** - * Called when {@link #call(int)} throws an exception and we are going to retry; take action to - * make it so we succeed on next call (clear caches, do relookup of locations, etc.). - * @param t - * @param retrying True if we are in retrying mode (we are not in retrying mode when max - * retries == 1; we ARE in retrying mode if retries > 1 even when we are the last attempt) - */ - void throwable(final Throwable t, boolean retrying); - +public interface RetryingCallable extends AbstractRetryingCallable { /** * Computes a result, or throws an exception if unable to do so. * @@ -54,18 +38,4 @@ public interface RetryingCallable { * @throws Exception if unable to compute a result */ T call(int callTimeout) throws Exception; - - /** - * @return Some details from the implementation that we would like to add to a terminating - * exception; i.e. a fatal exception is being thrown ending retries and we might like to add - * more implementation-specific detail on to the exception being thrown. - */ - String getExceptionMessageAdditionalDetail(); - - /** - * @param pause - * @param tries - * @return Suggestion on how much to sleep between retries - */ - long sleep(final long pause, final int tries); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java index a9f414f..ce2f93f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java @@ -65,5 +65,5 @@ abstract class RetryingCallerInterceptorContext { * retrying call */ public abstract RetryingCallerInterceptorContext prepare( - RetryingCallable callable, int tries); + AbstractRetryingCallable callable, int tries); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingResponsePromise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingResponsePromise.java new file mode 100644 index 0000000..4c48a4c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingResponsePromise.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.channel.EventLoop; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.AbstractResponsePromise; + +/** + * Retrying response future + * @param Type of object expected in the Future + */ +@InterfaceAudience.Private +public class RetryingResponsePromise extends AbstractResponsePromise { + private final TryHandler tryHandler; + private int tries = 0; + private CellScanner cellScanner; + + /** + * Constructor + * @param executor for the Future + * @param tryHandler Handling the call and failure + */ + public RetryingResponsePromise(EventLoop executor, TryHandler tryHandler) { + super(executor); + this.tryHandler = tryHandler; + } + + /** + * Do the actual call for the try. + */ + public void call(){ + this.tryHandler.call(tries).addListener(new ResponseFutureListener() { + @Override + public void operationComplete(ResponsePromise future) throws Exception { + if(future.isSuccess()){ + cellScanner = future.cellScanner(); + setSuccess(future.getNow()); + }else{ + try { + if(tryHandler.handleFail(tries, future.cause())){ + // Returned true so should try again + tries++; + call(); + }else{ + // Returned false but with no exception so should return empty result + setSuccess(null); + } + } catch (Throwable e) { + setFailure(e); + } + } + } + }); + } + + @Override + public CellScanner cellScanner() { + return this.cellScanner; + } + + /** + * Handles the try + * @param Type of response from the try + */ + public interface TryHandler { + /** + * Call method + * @param tries amount of tries + * @return Response future + */ + ResponsePromise call(int tries); + + /** + * Handles fails + * @param tries Current try number + * @param e exception thrown + * @return true if should continue to try, false if it should stop + * @throws Throwable if failure is a fail that cant be recovered + */ + boolean handleFail(int tries, Throwable e) throws Throwable; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 807c227..29822a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import java.io.IOException; -/** - * - */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface RpcRetryingCaller { @@ -43,7 +40,7 @@ public interface RpcRetryingCaller { /** * Call the server once only. - * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you + * {@link RetryingCallable} has a strange shape so we can do retries. Use this invocation if you * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely * succeed). * @return an object of type T @@ -52,4 +49,21 @@ public interface RpcRetryingCaller { */ T callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException; + + /** + * Call the method Async with retries + * @param callable the async RegionServer callable + * @param operationTimeout timeout for the operation + * @return Future with the result + */ + ResponsePromise callAsyncWithRetries(AsyncRegionServerCallable callable, int operationTimeout); + + /** + * Call the server once only. + * {@link RetryingCallable} has a strange shape so we can do retries. Use this invocation if you + * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely + * succeed). + * @return an object of type T + */ + ResponsePromise callAsyncWithoutRetries(AsyncRetryingCallable callable, int callTimeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index dd56b17..3f96b06 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -167,6 +167,96 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } } + @Override + public ResponsePromise callAsyncWithRetries(final AsyncRegionServerCallable callable, + final int callTimeout) { + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + context.clear(); + + RetryingResponsePromise future = + new RetryingResponsePromise<>(callable.getEventLoop(), new + RetryingResponsePromise.TryHandler() { + List exceptions = new ArrayList<>(); + long expectedSleep; + + @Override + public ResponsePromise call(int tries) { + try { + callable.prepare(tries != 0); // if called with false, check table status on ZK + interceptor.intercept(context.prepare(callable, tries)); + + return callable.call(getRemainingTime(callTimeout)); + } catch (IOException e) { + return new FailedResponsePromise<>(callable.getEventLoop(),e); + } + } + + @Override + public boolean handleFail(int tries, Throwable e) throws Throwable { + try { + if (e instanceof PreemptiveFastFailException) { + throw e; + } + + ExceptionUtil.rethrowIfInterrupt(e); + if (tries > startLogErrorsCnt) { + LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + + (EnvironmentEdgeManager.currentTime() - globalStartTime) + " ms ago, " + + "cancelled=" + cancelled.get() + ", msg=" + + callable.getExceptionMessageAdditionalDetail()); + } + + // translateException throws exception when should not retry: i.e. when request is bad + interceptor.handleFailure(context, e); + e = translateException(e); + callable.throwable(e, retries != 1); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(e, + EnvironmentEdgeManager.currentTime(), toString()); + exceptions.add(qt); + if (tries >= retries - 1) { + throw new RetriesExhaustedException(tries, exceptions); + } + // If the server is dead, we need to wait a little before retrying, to give + // a chance to the regions to be + // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time + expectedSleep = callable.sleep(pause, tries + 1); + + // If, after the planned sleep, there won't be enough time left, we stop now. + long duration = singleCallDuration(expectedSleep); + if (duration > callTimeout) { + String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + + ": " + callable.getExceptionMessageAdditionalDetail(); + throw (new SocketTimeoutException(msg).initCause(e)); + } + } finally { + interceptor.updateFailureInfo(context); + } + + try { + if (expectedSleep > 0) { + synchronized (cancelled) { + if (cancelled.get()) return false; + cancelled.wait(expectedSleep); + } + } + if (cancelled.get()){ + return false; + } + } catch (InterruptedException e1) { + throw new InterruptedIOException( + "Interrupted after " + tries + " tries on " + retries); + } + + return true; + } + }); + + future.call(); + + return future; + } + /** * @return Calculate how long a single call took */ @@ -193,7 +283,20 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } } } - + + @Override + public ResponsePromise callAsyncWithoutRetries(AsyncRetryingCallable callable, + int callTimeout) { + // The code of this method should be shared with withRetries. + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + try { + callable.prepare(false); + return callable.call(callTimeout); + } catch (IOException e) { + return new FailedResponsePromise<>(callable.getEventLoop(),e); + } + } + /** * Get the good or the remote exception if any, throws the DoNotRetryIOException. * @param t the throwable to analyze diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 8f28881..4353acc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -95,6 +95,91 @@ public class RpcRetryingCallerWithReadReplicas { * - we need to stop retrying when the call is completed * - we can be interrupted */ + abstract class AbstractReplicaRegionServerCallable extends RegionServerCallable + implements Cancellable { + final int id; + private final PayloadCarryingRpcController controller; + + public AbstractReplicaRegionServerCallable(int id, HRegionLocation location) { + super(RpcRetryingCallerWithReadReplicas.this.cConnection, + RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); + this.id = id; + this.location = location; + this.controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + } + + @Override + public void cancel() { + controller.startCancel(); + } + + /** + * Two responsibilities + * - if the call is already completed (by another replica) stops the retries. + * - set the location to the right region, depending on the replica. + */ + @Override + public void prepare(final boolean reload) throws IOException { + if (controller.isCanceled()) return; + + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + if (reload || location == null) { + RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow()); + location = id < rl.size() ? rl.getRegionLocation(id) : null; + } + + if (location == null || location.getServerName() == null) { + // With this exception, there will be a retry. The location can be null for a replica + // when the table is created or after a split. + throw new HBaseIOException("There is no location for replica id #" + id); + } + + ServerName dest = location.getServerName(); + + setStub(cConnection.getClient(dest)); + } + + @Override + public Result call(int callTimeout) throws Exception { + if (controller.isCanceled()) return null; + + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + byte[] reg = location.getRegionInfo().getRegionName(); + + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(reg, get); + controller.setCallTimeout(callTimeout); + + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) { + return null; + } + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } + } + + /** + * A RegionServerCallable that takes into account the replicas, i.e. + * - the call can be on any replica + * - we need to stop retrying when the call is completed + * - we can be interrupted + */ class ReplicaRegionServerCallable extends RegionServerCallable implements Cancellable { final int id; private final PayloadCarryingRpcController controller; @@ -242,6 +327,18 @@ public class RpcRetryingCallerWithReadReplicas { } /** + * Calls Replicas in an async way + * + * @return Future with result + */ + public ResponsePromise callAsync() { + boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); + + // TODO: fill out code + return null; + } + + /** * Extract the real exception from the ExecutionException, and throws what makes more * sense. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java index e82f1e8..8dd81ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java @@ -56,7 +56,37 @@ public class StatsTrackingRpcRetryingCaller implements RpcRetryingCaller { return updateStatsAndUnwrap(result, callable); } - private T updateStatsAndUnwrap(T result, RetryingCallable callable) { + @Override + public ResponsePromise callAsyncWithRetries(final AsyncRegionServerCallable callable, + int operationTimeout) { + ResponsePromise future = delegate.callAsyncWithRetries(callable, operationTimeout); + future.addListener(new ResponsePromise.ResponseFutureListener() { + @Override + public void operationComplete(ResponsePromise future) throws Exception { + if(future.isSuccess()) { + updateStatsAndUnwrap(future.getNow(), callable); + } + } + }); + return future; + } + + @Override + public ResponsePromise callAsyncWithoutRetries(final AsyncRetryingCallable callable, int + callTimeout) { + ResponsePromise future = delegate.callAsyncWithoutRetries(callable, callTimeout); + future.addListener(new ResponsePromise.ResponseFutureListener() { + @Override + public void operationComplete(ResponsePromise future) throws Exception { + if(future.isSuccess()) { + updateStatsAndUnwrap(future.getNow(), callable); + } + } + }); + return future; + } + + private T updateStatsAndUnwrap(T result, AbstractRetryingCallable callable) { // don't track stats about requests that aren't to regionservers if (!(callable instanceof RegionServerCallable)) { return result; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 498c587..3640847 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -169,6 +169,17 @@ public interface Table extends Closeable { Result get(Get get) throws IOException; /** + * Extracts certain cells from a given row. + * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row + * specified doesn't exist, the {@link Result} instance returned won't + * contain any {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. + * @throws IOException if a remote or network exception occurs. + * @since 2.0.0 + */ + ResponsePromise asyncGet(Get get); + + /** * Extracts certain cells from the given rows, in batch. * * @param gets The objects that specify what data to fetch and from which rows. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractResponsePromise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractResponsePromise.java new file mode 100644 index 0000000..a775908 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractResponsePromise.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.DefaultPromise; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ResponsePromise; + +/** + * Abstract response promise + * @param Type of result contained in Promise + */ +@InterfaceAudience.Private +public abstract class AbstractResponsePromise extends DefaultPromise implements + ResponsePromise { + + /** + * Constructor + * @param eventLoop to handle events on + */ + public AbstractResponsePromise(EventLoop eventLoop) { + super(eventLoop); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index ec1909a..95acf87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.DefaultPromise; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; @@ -36,20 +35,27 @@ import java.io.IOException; * Represents an Async Hbase call and its response. * * Responses are passed on to its given doneHandler and failures to the rpcController + * @param Type of message returned + * @param Message returned in communication to be converted */ @InterfaceAudience.Private -public class AsyncCall extends DefaultPromise { +public class AsyncCall extends AbstractResponsePromise { private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName()); final int id; final Descriptors.MethodDescriptor method; final Message param; - final PayloadCarryingRpcController controller; final Message responseDefaultType; + private final MessageConverter messageConverter; + final long startTime; final long rpcTimeout; + // For both request and response. + private CellScanner cellScanner; + private final int priority; + /** * Constructor * @@ -57,22 +63,29 @@ public class AsyncCall extends DefaultPromise { * @param connectId connection id * @param md the method descriptor * @param param parameters to send to Server - * @param controller controller for response + * @param cellScanner cellScanner containing cells to send as request + * @param messageConverter converts the messages to what is the expected output + * @param rpcTimeout timeout for this call in ms + * @param priority for this request * @param responseDefaultType the default response type */ public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message - param, PayloadCarryingRpcController controller, Message responseDefaultType) { + param, CellScanner cellScanner, M responseDefaultType, MessageConverter + messageConverter, long rpcTimeout, int priority) { super(eventLoop); this.id = connectId; this.method = md; this.param = param; - this.controller = controller; this.responseDefaultType = responseDefaultType; + this.messageConverter = messageConverter; this.startTime = EnvironmentEdgeManager.currentTime(); - this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; + this.rpcTimeout = rpcTimeout; + + this.priority = priority; + this.cellScanner = cellScanner; } /** @@ -96,9 +109,9 @@ public class AsyncCall extends DefaultPromise { * @param value to set * @param cellBlockScanner to set */ - public void setSuccess(Message value, CellScanner cellBlockScanner) { + public void setSuccess(M value, CellScanner cellBlockScanner) { if (cellBlockScanner != null) { - controller.setCellScanner(cellBlockScanner); + cellScanner = cellBlockScanner; } if (LOG.isTraceEnabled()) { @@ -106,7 +119,13 @@ public class AsyncCall extends DefaultPromise { LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms"); } - this.setSuccess(value); + try { + this.setSuccess( + this.messageConverter.convert(value) + ); + } catch (IOException e) { + this.setFailed(e); + } } /** @@ -133,4 +152,19 @@ public class AsyncCall extends DefaultPromise { public long getRpcTimeout() { return rpcTimeout; } + + /** + * @return Priority for this call + */ + public int getPriority() { + return priority; + } + + /** + * Get the cellScanner for this request. + * @return CellScanner + */ + public CellScanner cellScanner() { + return cellScanner; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index cfc8b1b..d365e66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -17,731 +17,59 @@ */ package org.apache.hadoop.hbase.ipc; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import javax.security.sasl.SaslException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.SaslClientHandler; -import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; - import com.google.protobuf.Descriptors; import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; +import io.netty.channel.EventLoop; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ResponsePromise; + +import java.net.InetSocketAddress; /** - * Netty RPC channel + * Interface for Async Rpc Channels */ @InterfaceAudience.Private -public class AsyncRpcChannel { - private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName()); - - private static final int MAX_SASL_RETRIES = 5; - - protected final static Map> tokenHandlers = new HashMap<>(); - - static { - tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, - new AuthenticationTokenSelector()); - } - - final AsyncRpcClient client; - - // Contains the channel to work with. - // Only exists when connected - private Channel channel; - - String name; - final User ticket; - final String serviceName; - final InetSocketAddress address; - - private int ioFailureCounter = 0; - private int connectFailureCounter = 0; - - boolean useSasl; - AuthMethod authMethod; - private int reloginMaxBackoff; - private Token token; - private String serverPrincipal; - - - // NOTE: closed and connected flags below are only changed when a lock on pendingCalls - private final Map pendingCalls = new HashMap(); - private boolean connected = false; - private boolean closed = false; - - private Timeout cleanupTimer; - - private final TimerTask timeoutTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - cleanupCalls(); - } - }; - - /** - * Constructor for netty RPC channel - * - * @param bootstrap to construct channel on - * @param client to connect with - * @param ticket of user which uses connection - * @param serviceName name of service to connect to - * @param address to connect to - */ - public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String - serviceName, InetSocketAddress address) { - this.client = client; - - this.ticket = ticket; - this.serviceName = serviceName; - this.address = address; - - this.channel = connect(bootstrap).channel(); - - name = ("IPC Client (" + channel.hashCode() + ") connection to " + - address.toString() + - ((ticket == null) ? - " from an unknown user" : - (" from " + ticket.getName()))); - } - - /** - * Connect to channel - * - * @param bootstrap to connect to - * @return future of connection - */ - private ChannelFuture connect(final Bootstrap bootstrap) { - return bootstrap.remoteAddress(address).connect() - .addListener(new GenericFutureListener() { - @Override - public void operationComplete(final ChannelFuture f) throws Exception { - if (!f.isSuccess()) { - if (f.cause() instanceof SocketException) { - retryOrClose(bootstrap, connectFailureCounter++, f.cause()); - } else { - retryOrClose(bootstrap, ioFailureCounter++, f.cause()); - } - return; - } - channel = f.channel(); - - setupAuthorization(); - - ByteBuf b = channel.alloc().directBuffer(6); - createPreamble(b, authMethod); - channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - if (useSasl) { - UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI(); - if (authMethod == AuthMethod.KERBEROS) { - if (ticket != null && ticket.getRealUser() != null) { - ticket = ticket.getRealUser(); - } - } - SaslClientHandler saslHandler; - if (ticket == null) { - throw new FatalConnectionException("ticket/user is null"); - } - final UserGroupInformation realTicket = ticket; - saslHandler = ticket.doAs(new PrivilegedExceptionAction() { - @Override - public SaslClientHandler run() throws IOException { - return getSaslHandler(realTicket, bootstrap); - } - }); - if (saslHandler != null) { - // Sasl connect is successful. Let's set up Sasl channel handler - channel.pipeline().addFirst(saslHandler); - } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - useSasl = false; - } - } else { - startHBaseConnection(f.channel()); - } - } - }); - } - - /** - * Start HBase connection - * - * @param ch channel to start connection on - */ - private void startHBaseConnection(Channel ch) { - ch.pipeline() - .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - try { - writeChannelHeader(ch).addListener(new GenericFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - close(future.cause()); - return; - } - List callsToWrite; - synchronized (pendingCalls) { - connected = true; - callsToWrite = new ArrayList(pendingCalls.values()); - } - for (AsyncCall call : callsToWrite) { - writeRequest(call); - } - } - }); - } catch (IOException e) { - close(e); - } - } - - /** - * Get SASL handler - * @param bootstrap to reconnect to - * @return new SASL handler - * @throws java.io.IOException if handler failed to create - */ - private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, - final Bootstrap bootstrap) throws IOException { - return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, - client.fallbackAllowed, client.conf.get("hbase.rpc.protection", - SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), - new SaslClientHandler.SaslExceptionHandler() { - @Override - public void handle(int retryCount, Random random, Throwable cause) { - try { - // Handle Sasl failure. Try to potentially get new credentials - handleSaslConnectionFailure(retryCount, cause, realTicket); - - // Try to reconnect - client.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - connect(bootstrap); - } - }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS); - } catch (IOException | InterruptedException e) { - close(e); - } - } - }, new SaslClientHandler.SaslSuccessfulConnectHandler() { - @Override - public void onSuccess(Channel channel) { - startHBaseConnection(channel); - } - }); - } - - /** - * Retry to connect or close - * - * @param bootstrap to connect with - * @param connectCounter amount of tries - * @param e exception of fail - */ - private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) { - if (connectCounter < client.maxRetries) { - client.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - connect(bootstrap); - } - }, client.failureSleep, TimeUnit.MILLISECONDS); - } else { - client.failedServers.addToFailedServers(address); - close(e); - } - } - +public interface AsyncRpcChannel { /** * Calls method on channel * @param method to call - * @param controller to run call with * @param request to send + * @param cellScanner with cells to send * @param responsePrototype to construct response with + * @param converter for the messages to expected result + * @param rpcTimeout timeout for request + * @param priority for request + * @return Promise for the response Message */ - public Promise callMethod(final Descriptors.MethodDescriptor method, - final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype) { - final AsyncCall call = - new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, - controller, responsePrototype); - controller.notifyOnCancel(new RpcCallback() { - @Override - public void run(Object parameter) { - // TODO: do not need to call AsyncCall.setFailed? - synchronized (pendingCalls) { - pendingCalls.remove(call.id); - } - } - }); - // TODO: this should be handled by PayloadCarryingRpcController. - if (controller.isCanceled()) { - // To finish if the call was cancelled before we set the notification (race condition) - call.cancel(true); - return call; - } - - synchronized (pendingCalls) { - if (closed) { - Promise promise = channel.eventLoop().newPromise(); - promise.setFailure(new ConnectException()); - return promise; - } - pendingCalls.put(call.id, call); - // Add timeout for cleanup if none is present - if (cleanupTimer == null && call.getRpcTimeout() > 0) { - cleanupTimer = - client.newTimeout(timeoutTask, call.getRpcTimeout(), - TimeUnit.MILLISECONDS); - } - if (!connected) { - return call; - } - } - writeRequest(call); - return call; - } - - AsyncCall removePendingCall(int id) { - synchronized (pendingCalls) { - return pendingCalls.remove(id); - } - } - - /** - * Write the channel header - * - * @param channel to write to - * @return future of write - * @throws java.io.IOException on failure to write - */ - private ChannelFuture writeChannelHeader(Channel channel) throws IOException { - RPCProtos.ConnectionHeader.Builder headerBuilder = - RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName); - - RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); - if (userInfoPB != null) { - headerBuilder.setUserInfo(userInfoPB); - } - - if (client.codec != null) { - headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); - } - if (client.compressor != null) { - headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); - } - - headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); - RPCProtos.ConnectionHeader header = headerBuilder.build(); - - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); - - ByteBuf b = channel.alloc().directBuffer(totalSize); - - b.writeInt(header.getSerializedSize()); - b.writeBytes(header.toByteArray()); - - return channel.writeAndFlush(b); - } - - /** - * Write request to channel - * - * @param call to write - */ - private void writeRequest(final AsyncCall call) { - try { - final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader - .newBuilder(); - requestHeaderBuilder.setCallId(call.id) - .setMethodName(call.method.getName()).setRequestParam(call.param != null); - - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder(). - setParentId(s.getSpanId()).setTraceId(s.getTraceId())); - } - - ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); - if (cellBlock != null) { - final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta - .newBuilder(); - cellBlockBuilder.setLength(cellBlock.limit()); - requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); - } - // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != 0) { - requestHeaderBuilder.setPriority(call.controller.getPriority()); - } - - RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); - if (cellBlock != null) { - totalSize += cellBlock.remaining(); - } - - ByteBuf b = channel.alloc().directBuffer(4 + totalSize); - try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { - IPCUtil.write(out, rh, call.param, cellBlock); - } - - channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); - } catch (IOException e) { - close(e); - } - } + ResponsePromise callMethod( + final Descriptors.MethodDescriptor method, + final Message request,final CellScanner cellScanner, + R responsePrototype, MessageConverter converter, + long rpcTimeout, int priority); /** - * Set up server authorization - * - * @throws java.io.IOException if auth setup failed + * Get the EventLoop on which this channel operated + * @return EventLoop */ - private void setupAuthorization() throws IOException { - SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); - this.useSasl = client.userProvider.isHBaseSecurityEnabled(); - - this.token = null; - if (useSasl && securityInfo != null) { - AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); - if (tokenKind != null) { - TokenSelector tokenSelector = tokenHandlers.get(tokenKind); - if (tokenSelector != null) { - token = tokenSelector - .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens()); - } else if (LOG.isDebugEnabled()) { - LOG.debug("No token selector found for type " + tokenKind); - } - } - String serverKey = securityInfo.getServerPrincipal(); - if (serverKey == null) { - throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); - } - this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), - address.getAddress().getCanonicalHostName().toLowerCase()); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " - + serverPrincipal); - } - } - - if (!useSasl) { - authMethod = AuthMethod.SIMPLE; - } else if (token != null) { - authMethod = AuthMethod.DIGEST; - } else { - authMethod = AuthMethod.KERBEROS; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Use " + authMethod + " authentication for service " + serviceName + - ", sasl=" + useSasl); - } - reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); - } - - /** - * Build the user information - * - * @param ugi User Group Information - * @param authMethod Authorization method - * @return UserInformation protobuf - */ - private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { - if (ugi == null || authMethod == AuthMethod.DIGEST) { - // Don't send user for token auth - return null; - } - RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); - if (authMethod == AuthMethod.KERBEROS) { - // Send effective user for Kerberos auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - } else if (authMethod == AuthMethod.SIMPLE) { - //Send both effective user and real user for simple auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - if (ugi.getRealUser() != null) { - userInfoPB.setRealUser(ugi.getRealUser().getUserName()); - } - } - return userInfoPB.build(); - } - - /** - * Create connection preamble - * - * @param byteBuf to write to - * @param authMethod to write - */ - private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { - byteBuf.writeBytes(HConstants.RPC_HEADER); - byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); - byteBuf.writeByte(authMethod.code); - } + EventLoop getEventLoop(); /** * Close connection - * - * @param e exception on close + * @param cause of closure. */ - public void close(final Throwable e) { - client.removeConnection(this); - - // Move closing from the requesting thread to the channel thread - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - List toCleanup; - synchronized (pendingCalls) { - if (closed) { - return; - } - closed = true; - toCleanup = new ArrayList(pendingCalls.values()); - pendingCalls.clear(); - } - IOException closeException = null; - if (e != null) { - if (e instanceof IOException) { - closeException = (IOException) e; - } else { - closeException = new IOException(e); - } - } - // log the info - if (LOG.isDebugEnabled() && closeException != null) { - LOG.debug(name + ": closing ipc connection to " + address, closeException); - } - if (cleanupTimer != null) { - cleanupTimer.cancel(); - cleanupTimer = null; - } - for (AsyncCall call : toCleanup) { - call.setFailed(closeException != null ? closeException : new ConnectionClosingException( - "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); - } - channel.disconnect().addListener(ChannelFutureListener.CLOSE); - if (LOG.isDebugEnabled()) { - LOG.debug(name + ": closed"); - } - } - }); - } - - /** - * Clean up calls. - * - * @param cleanAll true if all calls should be cleaned, false for only the timed out calls - */ - private void cleanupCalls() { - List toCleanup = new ArrayList(); - long currentTime = EnvironmentEdgeManager.currentTime(); - long nextCleanupTaskDelay = -1L; - synchronized (pendingCalls) { - for (Iterator iter = pendingCalls.values().iterator(); iter.hasNext();) { - AsyncCall call = iter.next(); - long timeout = call.getRpcTimeout(); - if (timeout > 0) { - if (currentTime - call.getStartTime() >= timeout) { - iter.remove(); - toCleanup.add(call); - } else { - if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { - nextCleanupTaskDelay = timeout; - } - } - } - } - if (nextCleanupTaskDelay > 0) { - cleanupTimer = - client.newTimeout(timeoutTask, nextCleanupTaskDelay, - TimeUnit.MILLISECONDS); - } else { - cleanupTimer = null; - } - } - for (AsyncCall call : toCleanup) { - call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" - + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); - } - } + void close(Throwable cause); /** * Check if the connection is alive * * @return true if alive */ - public boolean isAlive() { - return channel.isOpen(); - } + boolean isAlive(); /** - * Check if user should authenticate over Kerberos - * - * @return true if should be authenticated over Kerberos - * @throws java.io.IOException on failure of check + * Get the address on which this channel operates + * @return InetSocketAddress */ - private synchronized boolean shouldAuthenticateOverKrb() throws IOException { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && - loginUser != null && - //Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); - } - - /** - * If multiple clients with the same principal try to connect - * to the same server at the same time, the server assumes a - * replay attack is in progress. This is a feature of kerberos. - * In order to work around this, what is done is that the client - * backs off randomly and tries to initiate the connection - * again. - * The other problem is to do with ticket expiry. To handle that, - * a relogin is attempted. - *

- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} - * method. In case when the user doesn't have valid credentials, we don't - * need to retry (from cache or ticket). In such cases, it is prudent to - * throw a runtime exception when we receive a SaslException from the - * underlying authentication implementation, so there is no retry from - * other high level (for eg, HCM or HBaseAdmin). - *

- * - * @param currRetries retry count - * @param ex exception describing fail - * @param user which is trying to connect - * @throws java.io.IOException if IO fail - * @throws InterruptedException if thread is interrupted - */ - private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, - final UserGroupInformation user) throws IOException, InterruptedException { - user.doAs(new PrivilegedExceptionAction() { - public Void run() throws IOException, InterruptedException { - if (shouldAuthenticateOverKrb()) { - if (currRetries < MAX_SASL_RETRIES) { - LOG.debug("Exception encountered while connecting to the server : " + ex); - //try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - - // Should reconnect - return null; - } else { - String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; - LOG.warn(msg); - throw (IOException) new IOException(msg).initCause(ex); - } - } else { - LOG.warn("Exception encountered while connecting to " + - "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException) ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." + - " The most likely cause is missing or invalid credentials." + - " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); - } - throw new IOException(ex); - } - }); - } - - public int getConnectionHashCode() { - return ConnectionId.hashCode(ticket, serviceName, address); - } - - @Override - public String toString() { - return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; - } - - /** - * Listens to call writes and fails if write failed - */ - private static final class CallWriteListener implements ChannelFutureListener { - private final AsyncRpcChannel rpcChannel; - private final int id; - - public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) { - this.rpcChannel = asyncRpcChannel; - this.id = id; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - AsyncCall call = rpcChannel.removePendingCall(id); - if (call != null) { - if (future.cause() instanceof IOException) { - call.setFailed((IOException) future.cause()); - } else { - call.setFailed(new IOException(future.cause())); - } - } - } - } - } + InetSocketAddress getAddress(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java new file mode 100644 index 0000000..ac81f61 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java @@ -0,0 +1,758 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.GenericFutureListener; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ResponsePromise; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.SaslClientHandler; +import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.SecurityInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; + +/** + * Netty RPC channel + */ +@InterfaceAudience.Private +public class AsyncRpcChannelImpl implements AsyncRpcChannel { + private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName()); + + private static final int MAX_SASL_RETRIES = 5; + + protected final static Map> tokenHandlers = new HashMap<>(); + + static { + tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, + new AuthenticationTokenSelector()); + } + + final AsyncRpcClient client; + // Contains the channel to work with. + // Only exists when connected + private Channel channel; + + String name; + + final User ticket; + final String serviceName; + final InetSocketAddress address; + + private int ioFailureCounter = 0; + private int connectFailureCounter = 0; + + boolean useSasl; + AuthMethod authMethod; + private int reloginMaxBackoff; + private Token token; + private String serverPrincipal; + + + // NOTE: closed and connected flags below are only changed when a lock on pendingCalls + private final Map pendingCalls = new HashMap(); + private boolean connected = false; + private boolean closed = false; + + private Timeout cleanupTimer; + + private final TimerTask timeoutTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + cleanupCalls(); + } + }; + + /** + * Constructor for netty RPC channel + * + * @param bootstrap to construct channel on + * @param client to connect with + * @param ticket of user which uses connection + * @param serviceName name of service to connect to + * @param address to connect to + */ + public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, + String serviceName, InetSocketAddress address) { + this.client = client; + + this.ticket = ticket; + this.serviceName = serviceName; + this.address = address; + + this.channel = connect(bootstrap).channel(); + + name = ("IPC Client (" + channel.hashCode() + ") connection to " + + address.toString() + + ((ticket == null) ? + " from an unknown user" : + (" from " + ticket.getName()))); + } + + /** + * Connect to channel + * + * @param bootstrap to connect to + * @return future of connection + */ + private ChannelFuture connect(final Bootstrap bootstrap) { + return bootstrap.remoteAddress(address).connect() + .addListener(new GenericFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) throws Exception { + if (!f.isSuccess()) { + if (f.cause() instanceof SocketException) { + retryOrClose(bootstrap, connectFailureCounter++, f.cause()); + } else { + retryOrClose(bootstrap, ioFailureCounter++, f.cause()); + } + return; + } + channel = f.channel(); + + setupAuthorization(); + + ByteBuf b = channel.alloc().directBuffer(6); + createPreamble(b, authMethod); + channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + if (useSasl) { + UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI(); + if (authMethod == AuthMethod.KERBEROS) { + if (ticket != null && ticket.getRealUser() != null) { + ticket = ticket.getRealUser(); + } + } + SaslClientHandler saslHandler; + if (ticket == null) { + throw new FatalConnectionException("ticket/user is null"); + } + final UserGroupInformation realTicket = ticket; + saslHandler = ticket.doAs(new PrivilegedExceptionAction() { + @Override + public SaslClientHandler run() throws IOException { + return getSaslHandler(realTicket, bootstrap); + } + }); + if (saslHandler != null) { + // Sasl connect is successful. Let's set up Sasl channel handler + channel.pipeline().addFirst(saslHandler); + } else { + // fall back to simple auth because server told us so. + authMethod = AuthMethod.SIMPLE; + useSasl = false; + } + } else { + startHBaseConnection(f.channel()); + } + } + }); + } + + /** + * Start HBase connection + * + * @param ch channel to start connection on + */ + private void startHBaseConnection(Channel ch) { + ch.pipeline() + .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast(new AsyncServerResponseHandler(this)); + try { + writeChannelHeader(ch).addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + close(future.cause()); + return; + } + List callsToWrite; + synchronized (pendingCalls) { + connected = true; + callsToWrite = new ArrayList(pendingCalls.values()); + } + for (AsyncCall call : callsToWrite) { + writeRequest(call); + } + } + }); + } catch (IOException e) { + close(e); + } + } + + /** + * Get SASL handler + * @param bootstrap to reconnect to + * @return new SASL handler + * @throws java.io.IOException if handler failed to create + */ + private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, + final Bootstrap bootstrap) throws IOException { + return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, + client.fallbackAllowed, client.conf.get("hbase.rpc.protection", + SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), + new SaslClientHandler.SaslExceptionHandler() { + @Override + public void handle(int retryCount, Random random, Throwable cause) { + try { + // Handle Sasl failure. Try to potentially get new credentials + handleSaslConnectionFailure(retryCount, cause, realTicket); + + // Try to reconnect + client.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + connect(bootstrap); + } + }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS); + } catch (IOException | InterruptedException e) { + close(e); + } + } + }, new SaslClientHandler.SaslSuccessfulConnectHandler() { + @Override + public void onSuccess(Channel channel) { + startHBaseConnection(channel); + } + }); + } + + /** + * Retry to connect or close + * + * @param bootstrap to connect with + * @param connectCounter amount of tries + * @param e exception of fail + */ + private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) { + if (connectCounter < client.maxRetries) { + client.newTimeout(new TimerTask() { + @Override public void run(Timeout timeout) throws Exception { + connect(bootstrap); + } + }, client.failureSleep, TimeUnit.MILLISECONDS); + } else { + client.failedServers.addToFailedServers(address); + close(e); + } + } + + /** + * Calls method on channel + * @param method to call + * @param request to send + * @param cellScanner with cells to send + * @param responsePrototype to construct response with + * @param rpcTimeout timeout for request + * @param priority for request + * @return Promise for the response Message + */ + @Override + public ResponsePromise callMethod( + final Descriptors.MethodDescriptor method, + final Message request,final CellScanner cellScanner, + R responsePrototype, MessageConverter converter, + long rpcTimeout, int priority) { + final AsyncCall call = + new AsyncCall<>(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, + cellScanner, responsePrototype, converter, rpcTimeout, priority); + + synchronized (pendingCalls) { + if (closed) { + call.setFailure(new ConnectException()); + return call; + } + pendingCalls.put(call.id, call); + // Add timeout for cleanup if none is present + if (cleanupTimer == null && call.getRpcTimeout() > 0) { + cleanupTimer = + client.newTimeout(timeoutTask, call.getRpcTimeout(), + TimeUnit.MILLISECONDS); + } + if (!connected) { + return call; + } + } + writeRequest(call); + return call; + } + + @Override + public EventLoop getEventLoop() { + return this.channel.eventLoop(); + } + + AsyncCall removePendingCall(int id) { + synchronized (pendingCalls) { + return pendingCalls.remove(id); + } + } + + /** + * Write the channel header + * + * @param channel to write to + * @return future of write + * @throws java.io.IOException on failure to write + */ + private ChannelFuture writeChannelHeader(Channel channel) throws IOException { + RPCProtos.ConnectionHeader.Builder headerBuilder = + RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName); + + RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); + if (userInfoPB != null) { + headerBuilder.setUserInfo(userInfoPB); + } + + if (client.codec != null) { + headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); + } + if (client.compressor != null) { + headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); + } + + headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); + RPCProtos.ConnectionHeader header = headerBuilder.build(); + + + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); + + ByteBuf b = channel.alloc().directBuffer(totalSize); + + b.writeInt(header.getSerializedSize()); + b.writeBytes(header.toByteArray()); + + return channel.writeAndFlush(b); + } + + /** + * Write request to channel + * + * @param call to write + */ + private void writeRequest(final AsyncCall call) { + try { + final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader + .newBuilder(); + requestHeaderBuilder.setCallId(call.id) + .setMethodName(call.method.getName()).setRequestParam(call.param != null); + + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder(). + setParentId(s.getSpanId()).setTraceId(s.getTraceId())); + } + + ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner()); + if (cellBlock != null) { + final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta + .newBuilder(); + cellBlockBuilder.setLength(cellBlock.limit()); + requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); + } + // Only pass priority if there one. Let zero be same as no priority. + if (call.getPriority() != 0) { + requestHeaderBuilder.setPriority(call.getPriority()); + } + + RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); + + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); + if (cellBlock != null) { + totalSize += cellBlock.remaining(); + } + + ByteBuf b = channel.alloc().directBuffer(4 + totalSize); + try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { + IPCUtil.write(out, rh, call.param, cellBlock); + } + + channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); + } catch (IOException e) { + close(e); + } + } + + /** + * Set up server authorization + * + * @throws java.io.IOException if auth setup failed + */ + private void setupAuthorization() throws IOException { + SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); + this.useSasl = client.userProvider.isHBaseSecurityEnabled(); + + this.token = null; + if (useSasl && securityInfo != null) { + AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); + if (tokenKind != null) { + TokenSelector tokenSelector = tokenHandlers.get(tokenKind); + if (tokenSelector != null) { + token = tokenSelector + .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens()); + } else if (LOG.isDebugEnabled()) { + LOG.debug("No token selector found for type " + tokenKind); + } + } + String serverKey = securityInfo.getServerPrincipal(); + if (serverKey == null) { + throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); + } + this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), + address.getAddress().getCanonicalHostName().toLowerCase()); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " + + serverPrincipal); + } + } + + if (!useSasl) { + authMethod = AuthMethod.SIMPLE; + } else if (token != null) { + authMethod = AuthMethod.DIGEST; + } else { + authMethod = AuthMethod.KERBEROS; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Use " + authMethod + " authentication for service " + serviceName + + ", sasl=" + useSasl); + } + reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); + } + + /** + * Build the user information + * + * @param ugi User Group Information + * @param authMethod Authorization method + * @return UserInformation protobuf + */ + private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { + if (ugi == null || authMethod == AuthMethod.DIGEST) { + // Don't send user for token auth + return null; + } + RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); + if (authMethod == AuthMethod.KERBEROS) { + // Send effective user for Kerberos auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + } else if (authMethod == AuthMethod.SIMPLE) { + //Send both effective user and real user for simple auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + if (ugi.getRealUser() != null) { + userInfoPB.setRealUser(ugi.getRealUser().getUserName()); + } + } + return userInfoPB.build(); + } + + /** + * Create connection preamble + * + * @param byteBuf to write to + * @param authMethod to write + */ + private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { + byteBuf.writeBytes(HConstants.RPC_HEADER); + byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); + byteBuf.writeByte(authMethod.code); + } + + /** + * Close connection + * + * @param e exception on close + */ + public void close(final Throwable e) { + client.removeConnection(this); + + // Move closing from the requesting thread to the channel thread + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + List toCleanup; + synchronized (pendingCalls) { + if (closed) { + return; + } + closed = true; + toCleanup = new ArrayList(pendingCalls.values()); + pendingCalls.clear(); + } + IOException closeException = null; + if (e != null) { + if (e instanceof IOException) { + closeException = (IOException) e; + } else { + closeException = new IOException(e); + } + } + // log the info + if (LOG.isDebugEnabled() && closeException != null) { + LOG.debug(name + ": closing ipc connection to " + address, closeException); + } + if (cleanupTimer != null) { + cleanupTimer.cancel(); + cleanupTimer = null; + } + for (AsyncCall call : toCleanup) { + call.setFailed(closeException != null ? closeException : new ConnectionClosingException( + "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); + } + channel.disconnect().addListener(ChannelFutureListener.CLOSE); + if (LOG.isDebugEnabled()) { + LOG.debug(name + ": closed"); + } + } + }); + } + + /** + * Clean up calls. + */ + private void cleanupCalls() { + List toCleanup = new ArrayList(); + long currentTime = EnvironmentEdgeManager.currentTime(); + long nextCleanupTaskDelay = -1L; + synchronized (pendingCalls) { + for (Iterator iter = pendingCalls.values().iterator(); iter.hasNext();) { + AsyncCall call = iter.next(); + long timeout = call.getRpcTimeout(); + if (timeout > 0) { + if (currentTime - call.getStartTime() >= timeout) { + iter.remove(); + toCleanup.add(call); + } else { + if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { + nextCleanupTaskDelay = timeout; + } + } + } + } + if (nextCleanupTaskDelay > 0) { + cleanupTimer = + client.newTimeout(timeoutTask, nextCleanupTaskDelay, + TimeUnit.MILLISECONDS); + } else { + cleanupTimer = null; + } + } + for (AsyncCall call : toCleanup) { + call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" + + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); + } + } + + /** + * Check if the connection is alive + * + * @return true if alive + */ + public boolean isAlive() { + return channel.isOpen(); + } + + @Override + public InetSocketAddress getAddress() { + return this.address; + } + + /** + * Check if user should authenticate over Kerberos + * + * @return true if should be authenticated over Kerberos + * @throws java.io.IOException on failure of check + */ + private synchronized boolean shouldAuthenticateOverKrb() throws IOException { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUser = currentUser.getRealUser(); + return authMethod == AuthMethod.KERBEROS && + loginUser != null && + //Make sure user logged in using Kerberos either keytab or TGT + loginUser.hasKerberosCredentials() && + // relogin only in case it is the login user (e.g. JT) + // or superuser (like oozie). + (loginUser.equals(currentUser) || loginUser.equals(realUser)); + } + + /** + * If multiple clients with the same principal try to connect + * to the same server at the same time, the server assumes a + * replay attack is in progress. This is a feature of kerberos. + * In order to work around this, what is done is that the client + * backs off randomly and tries to initiate the connection + * again. + * The other problem is to do with ticket expiry. To handle that, + * a relogin is attempted. + *

+ * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} + * method. In case when the user doesn't have valid credentials, we don't + * need to retry (from cache or ticket). In such cases, it is prudent to + * throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from + * other high level (for eg, HCM or HBaseAdmin). + *

+ * + * @param currRetries retry count + * @param ex exception describing fail + * @param user which is trying to connect + * @throws java.io.IOException if IO fail + * @throws InterruptedException if thread is interrupted + */ + private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, + final UserGroupInformation user) throws IOException, InterruptedException { + user.doAs(new PrivilegedExceptionAction() { + public Void run() throws IOException, InterruptedException { + if (shouldAuthenticateOverKrb()) { + if (currRetries < MAX_SASL_RETRIES) { + LOG.debug("Exception encountered while connecting to the server : " + ex); + //try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + + // Should reconnect + return null; + } else { + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + + " to " + serverPrincipal; + LOG.warn(msg); + throw (IOException) new IOException(msg).initCause(ex); + } + } else { + LOG.warn("Exception encountered while connecting to " + + "the server : " + ex); + } + if (ex instanceof RemoteException) { + throw (RemoteException) ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } + }); + } + + @Override + public int hashCode() { + return ConnectionId.hashCode(ticket, serviceName, address); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + AsyncRpcChannelImpl that = (AsyncRpcChannelImpl) o; + + return hashCode() == that.hashCode(); + } + + @Override + public String toString() { + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; + } + /** + * Listens to call writes and fails if write failed + */ + private static final class CallWriteListener implements ChannelFutureListener { + private final AsyncRpcChannelImpl rpcChannel; + private final int id; + + public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannel, int id) { + this.rpcChannel = asyncRpcChannel; + this.id = id; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + AsyncCall call = rpcChannel.removePendingCall(id); + if (call != null) { + if (future.cause() instanceof IOException) { + call.setFailed((IOException) future.cause()); + } else { + call.setFailed(new IOException(future.cause())); + } + } + } + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 2e4d0a6..f1aaf9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -21,6 +21,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -30,9 +31,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; @@ -52,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ResponsePromise; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; @@ -102,6 +101,7 @@ public class AsyncRpcClient extends AbstractRpcClient { @VisibleForTesting static Pair> GLOBAL_EVENT_LOOP_GROUP; + private long defaultRpcTimeout; private synchronized static Pair> getGlobalEventLoopGroup(Configuration conf) { @@ -223,12 +223,24 @@ public class AsyncRpcClient extends AbstractRpcClient { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); } - final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); + final AsyncRpcChannel connection = + createRpcChannel(md.getService().getName(), addr, ticket); + + final ResponsePromise promise = connection.callMethod(md, param, pcrc.cellScanner(), + returnType, RETURN_SAME_MESSAGE, pcrc.getCallTimeout(), pcrc.getPriority()); + + pcrc.notifyOnCancel(new RpcCallback() { + @Override + public void run(Object parameter) { + // Will automatically fail the promise with CancellationException + promise.cancel(true); + } + }); - Promise promise = connection.callMethod(md, pcrc, param, returnType); long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; try { Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); + pcrc.setCellScanner(promise.cellScanner()); return new Pair<>(response, pcrc.cellScanner()); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { @@ -241,6 +253,14 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + private static final MessageConverter RETURN_SAME_MESSAGE = new + MessageConverter() { + @Override + public Message convert(Message msg) { + return msg; + } + }; + /** * Call method async */ @@ -251,33 +271,35 @@ public class AsyncRpcClient extends AbstractRpcClient { try { connection = createRpcChannel(md.getService().getName(), addr, ticket); - connection.callMethod(md, pcrc, param, returnType).addListener( - new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if(!future.isSuccess()){ - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - }else{ - pcrc.setFailed(new IOException(cause)); - } - }else{ - try { - done.run(future.get()); - }catch (ExecutionException e){ - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - }else{ - pcrc.setFailed(new IOException(cause)); + connection.callMethod(md, param, pcrc.cellScanner(), returnType, RETURN_SAME_MESSAGE, + pcrc.getCallTimeout(), pcrc.getPriority()).addListener( + new ResponsePromise.ResponseFutureListener() { + @Override + public void operationComplete(ResponsePromise future) throws Exception { + if (!future.isSuccess()) { + Throwable cause = future.cause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); + } + } else { + try { + pcrc.setCellScanner(future.cellScanner()); + done.run(future.get()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); + } + } catch (InterruptedException e) { + pcrc.setFailed(new IOException(e)); + } } - }catch (InterruptedException e){ - pcrc.setFailed(new IOException(e)); } - } - } - }); + }); } catch (StoppedRpcClientException|FailedServerException e) { pcrc.setFailed(e); } @@ -308,6 +330,11 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + @Override + public EventLoop getEventLoop() { + return this.bootstrap.group().next(); + } + /** * Create a cell scanner * @@ -330,6 +357,13 @@ public class AsyncRpcClient extends AbstractRpcClient { return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); } + @Override + public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) + throws StoppedRpcClientException, FailedServerException { + return this.createRpcChannel(serviceName, new InetSocketAddress(sn.getHostname(), sn.getPort()), + user); + } + /** * Creates an RPC client * @@ -340,7 +374,7 @@ public class AsyncRpcClient extends AbstractRpcClient { * @throws StoppedRpcClientException when Rpc client is stopped * @throws FailedServerException if server failed */ - private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, + public AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, User ticket) throws StoppedRpcClientException, FailedServerException { // Check if server is failed if (this.failedServers.isFailedServer(location)) { @@ -361,7 +395,7 @@ public class AsyncRpcClient extends AbstractRpcClient { } rpcChannel = connections.get(hashCode); if (rpcChannel == null) { - rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); + rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location); connections.put(hashCode, rpcChannel); } } @@ -384,8 +418,8 @@ public class AsyncRpcClient extends AbstractRpcClient { synchronized (connections) { for (AsyncRpcChannel rpcChannel : connections.values()) { if (rpcChannel.isAlive() && - rpcChannel.address.getPort() == sn.getPort() && - rpcChannel.address.getHostName().contentEquals(sn.getHostname())) { + rpcChannel.getAddress().getPort() == sn.getPort() && + rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) { LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + rpcChannel.toString()); rpcChannel.close(null); @@ -398,7 +432,7 @@ public class AsyncRpcClient extends AbstractRpcClient { * Remove connection from pool */ public void removeConnection(AsyncRpcChannel connection) { - int connectionHashCode = connection.getConnectionHashCode(); + int connectionHashCode = connection.hashCode(); synchronized (connections) { // we use address as cache key, so we should check here to prevent removing the // wrong connection @@ -429,6 +463,14 @@ public class AsyncRpcClient extends AbstractRpcClient { } /** + * Get the default RPC timeout + * @return the RPC timeout value as a long + */ + public long getDefaultRpcTimeout() { + return conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + } + + /** * Blocking rpc channel that goes via hbase rpc. */ @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index 1404e6f..698e620 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -40,14 +40,14 @@ import com.google.protobuf.Message; public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { private static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName()); - private final AsyncRpcChannel channel; + private final AsyncRpcChannelImpl channel; /** * Constructor * * @param channel on which this response handler operates */ - public AsyncServerResponseHandler(AsyncRpcChannel channel) { + public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) { this.channel = channel; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java new file mode 100644 index 0000000..53f867b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * Interface to convert Messages to specific types + * @param Message Type to convert + * @param Output Type + */ +@InterfaceAudience.Private +public interface MessageConverter { + /** + * Converts Message to Output + * @param msg to convert + * @return Output + * @throws IOException if message could not be converted to response + */ + O convert(M msg) throws IOException; +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index cf689f5..980f59d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.BlockingRpcChannel; +import io.netty.channel.EventLoop; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.User; @@ -68,6 +69,18 @@ import java.io.IOException; int rpcTimeout) throws IOException; /** + * Create or fetch AsyncRpcChannel + * @param serviceName to connect to + * @param sn ServerName of the channel to create + * @param user for the service + * @return An async RPC channel fitting given parameters + * @throws FailedServerException if server failed + * @throws StoppedRpcClientException if the RPC client has stopped + */ + public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) + throws StoppedRpcClientException, FailedServerException; + + /** * Interrupt the connections to the given server. This should be called if the server * is known as actually dead. This will not prevent current operation to be retried, and, * depending on their own behavior, they may retry on the same server. This can be a feature, @@ -83,4 +96,10 @@ import java.io.IOException; * using this client. */ @Override public void close(); + + /** + * Get an event loop to operate on + * @return EventLoop + */ + EventLoop getEventLoop(); } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 9a5fc14..a5b6f87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -24,6 +24,7 @@ import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; +import io.netty.channel.EventLoop; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -1129,6 +1130,12 @@ public class RpcClientImpl extends AbstractRpcClient { } } + @Override + public EventLoop getEventLoop() { + // TODO: DELETE class or remove Eventloop + return null; + } + /** Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, * with the ticket credentials, returning the value. @@ -1238,6 +1245,13 @@ public class RpcClientImpl extends AbstractRpcClient { } } + @Override + public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) + throws StoppedRpcClientException, FailedServerException { + // Todo: implement or delete this implementation + return null; + } + /** * Interrupt the connections to the given ip:port server. This should be called if the server * is known as actually dead. This will not prevent current operation to be retried, and, diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 2d122df..222d22b 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResponsePromise; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Row; @@ -309,6 +310,12 @@ public class RemoteHTable implements Table { } @Override + public ResponsePromise asyncGet(Get get) { + // TODO: implement + return null; + } + + @Override public Result[] get(List gets) throws IOException { byte[][] rows = new byte[gets.size()][]; int maxVersions = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index e3641c7..3e3f498 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -133,6 +133,11 @@ public final class HTableWrapper implements Table { return table.get(get); } + @Override + public ResponsePromise asyncGet(Get get) { + return table.asyncGet(get); + } + public boolean exists(Get get) throws IOException { return table.exists(get); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java new file mode 100644 index 0000000..2423533 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsync.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Run tests that use the HBase clients; {@link HTable}. + * Sets up the HBase mini cluster once at start and runs through all client tests. + * Each creates a table named for the method and does its stuff against that. + */ +@Category({LargeTests.class, ClientTests.class}) +@SuppressWarnings ("deprecation") +public class TestFromClientSideAsync { + private static final Log LOG = LogFactory.getLog(TestFromClientSideAsync.class); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte [] ROW = Bytes.toBytes("testRow"); + private static final byte [] FAMILY = Bytes.toBytes("testFamily"); + private static final byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte [] VALUE = Bytes.toBytes("testValue"); + protected static int SLAVES = 3; + + /** + * @throws Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Uncomment the following lines if more verbosity is needed for + // debugging (see HBASE-12285 for details). + //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); + //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); + //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testGet_NonExistentRow() throws IOException, ExecutionException, + InterruptedException { + Table table = TEST_UTIL.createTable(TableName.valueOf("testGet_NonExistentRow"), FAMILY); + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + LOG.info("Row put"); + + Get get = new Get(ROW); + get.addFamily(FAMILY); + Result r = table.asyncGet(get).get(); + assertFalse(r.isEmpty()); + System.out.println("Row retrieved successfully"); + + byte [] missingrow = Bytes.toBytes("missingrow"); + get = new Get(missingrow); + get.addFamily(FAMILY); + r = table.asyncGet(get).get(); + assertTrue(r.isEmpty()); + LOG.info("Row missing as it should be"); + } +} -- 2.3.2 (Apple Git-55)