From 06cab6a525bd2bc852f7387c6613e018505d89ec Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Wed, 2 Aug 2017 15:00:38 -0700 Subject: [PATCH] HBASE-18078. [C++] Harden RPC by handling various communication abnormalities --- .../connection/client-dispatcher.cc | 16 ++++++- hbase-native-client/connection/client-handler.cc | 14 +++++-- .../connection/connection-factory.cc | 29 ++++++++----- hbase-native-client/connection/rpc-client.cc | 49 +++++++++++++++++++++- hbase-native-client/connection/rpc-client.h | 8 ++++ hbase-native-client/core/configuration.cc | 2 +- hbase-native-client/exceptions/exception.h | 14 ++++++- 7 files changed, 112 insertions(+), 20 deletions(-) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index b9b2c34e91..37b2ce1e70 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -18,8 +18,10 @@ */ #include "connection/client-dispatcher.h" #include - +#include +#include #include +#include "exceptions/exception.h" using std::unique_ptr; @@ -31,6 +33,9 @@ void ClientDispatcher::read(Context *ctx, unique_ptr in) { auto call_id = in->call_id(); auto p = requests_.find_and_erase(call_id); + VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what: {}", + in->call_id(), bool(in->exception()), in->exception().what()); + if (in->exception()) { p.setException(in->exception()); } else { @@ -51,7 +56,14 @@ folly::Future> ClientDispatcher::operator()(unique_ptr buf) { : ""; std::string stack_trace = exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : ""; - what.append(exception_class_name).append(stack_trace); + what.append(stack_trace); auto remote_exception = std::make_unique(what); remote_exception->set_exception_class_name(exception_class_name) @@ -133,7 +133,13 @@ folly::Future ClientHandler::write(Context *ctx, std::unique_ptrinsert(std::make_pair(r->call_id(), r->resp_msg())); - // Send the data down the pipeline. - return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + try { + // Send the data down the pipeline. + return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + } catch (const folly::AsyncSocketException &e) { + /* clear protobuf::Message to avoid overflow. */ + resp_msgs_->erase(r->call_id()); + throw e; + } } } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index a0c7f96118..e763c038bd 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -22,11 +22,16 @@ #include +#include +#include +#include + #include "connection/client-dispatcher.h" #include "connection/connection-factory.h" #include "connection/pipeline.h" #include "connection/sasl-handler.h" #include "connection/service.h" +#include "exceptions/exception.h" using std::chrono::milliseconds; using std::chrono::nanoseconds; @@ -56,15 +61,19 @@ std::shared_ptr> ConnectionFactory::M std::shared_ptr ConnectionFactory::Connect( std::shared_ptr> client, const std::string &hostname, uint16_t port) { - // Yes this will block however it makes dealing with connection pool soooooo - // much nicer. - // TODO see about using shared promise for this. - auto pipeline = client - ->connect(folly::SocketAddress(hostname, port, true), - std::chrono::duration_cast(connect_timeout_)) - .get(); - auto dispatcher = std::make_shared(); - dispatcher->setPipeline(pipeline); - return dispatcher; + try { + // Yes this will block however it makes dealing with connection pool soooooo + // much nicer. + // TODO see about using shared promise for this. + auto pipeline = client + ->connect(folly::SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)) + .get(); + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + return dispatcher; + } catch (const folly::AsyncSocketException &e) { + throw ConnectionException(folly::exception_wrapper{e}); + } } } // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 10faa7a84e..c943ac4310 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -19,9 +19,12 @@ #include "connection/rpc-client.h" +#include #include +#include #include #include +#include "exceptions/exception.h" using hbase::security::User; using std::chrono::nanoseconds; @@ -55,7 +58,7 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::unique_ptr req, std::shared_ptr ticket) { auto remote_id = std::make_shared(host, port, ticket); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return SendRequest(remote_id, std::move(req)); } folly::Future> RpcClient::AsyncCall(const std::string& host, @@ -64,7 +67,49 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr ticket, const std::string& service_name) { auto remote_id = std::make_shared(host, port, ticket, service_name); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return SendRequest(remote_id, std::move(req)); +} + +/** + * TODO: + * This function should plug in ConnectionRetryPolicy to handle: + * 1. AsyncSocketException as a result of the 1st time connection establishment. + * 2. ConnectionException (i.e. AsyncSocketException as a cause) for Request/Response async call + * after + * the corresponding connection is established. + */ +folly::Future> RpcClient::SendRequest( + std::shared_ptr remote_id, std::unique_ptr req) { + try { + return GetConnection(remote_id) + ->SendRequest(std::move(req)) + .onError([&, this](const folly::exception_wrapper& ew) { + VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what()); + ew.with_exception([&, this](const hbase::ConnectionException& re) { + /* bad connection, remove it from pool. */ + cp_->Close(remote_id); + }); + return GetFutureWithException(ew); + }); + } catch (const ConnectionException& e) { + VLOG(3) << folly::sformat("RpcClient Exception: {}", e.what()); + /* bad connection, remove it from pool. */ + cp_->Close(remote_id); + return GetFutureWithException(e); + } +} + +template +folly::Future> RpcClient::GetFutureWithException(const EXCEPTION& e) { + return GetFutureWithException(folly::exception_wrapper{e}); +} + +folly::Future> RpcClient::GetFutureWithException( + const folly::exception_wrapper& ew) { + folly::Promise> promise; + auto future = promise.getFuture(); + promise.setException(ew); + return future; } std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index 0ecde5b775..8145be4388 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -20,6 +20,7 @@ #include +#include #include #include #include @@ -65,6 +66,13 @@ class RpcClient { private: std::shared_ptr GetConnection(std::shared_ptr remote_id); + folly::Future> SendRequest(std::shared_ptr remote_id, + std::unique_ptr req); + template + folly::Future> GetFutureWithException(const EXCEPTION &e); + + folly::Future> GetFutureWithException( + const folly::exception_wrapper &ew); private: std::shared_ptr cp_; diff --git a/hbase-native-client/core/configuration.cc b/hbase-native-client/core/configuration.cc index f4fc46d3ac..1fd2851559 100644 --- a/hbase-native-client/core/configuration.cc +++ b/hbase-native-client/core/configuration.cc @@ -24,8 +24,8 @@ #include #include -#include #include +#include namespace hbase { diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index bdedff4068..2e79055625 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -59,7 +59,7 @@ class IOException : public std::logic_error { IOException(const std::string& what, bool do_not_retry) : logic_error(what), do_not_retry_(do_not_retry) {} - IOException(const std::string& what, folly::exception_wrapper cause) + IOException(const std::string& what, const folly::exception_wrapper& cause) : logic_error(what), cause_(cause), do_not_retry_(false) {} IOException(const std::string& what, folly::exception_wrapper cause, bool do_not_retry) @@ -115,6 +115,18 @@ class RetriesExhaustedException : public IOException { int32_t num_retries_; }; +class ConnectionException : public IOException { + public: + ConnectionException() {} + + ConnectionException(const std::string& what) : IOException(what) {} + + ConnectionException(const folly::exception_wrapper& cause) : IOException("", cause) {} + + ConnectionException(const std::string& what, const folly::exception_wrapper& cause) + : IOException(what, cause) {} +}; + class RemoteException : public IOException { public: RemoteException() : IOException(), port_(0) {} -- 2.11.0 (Apple Git-81)