From 118c413cf18bdf56d813cf53cab41528c0553b5e Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Wed, 2 Aug 2017 15:00:38 -0700 Subject: [PATCH] HBASE-18078. [C++] Harden RPC by handling various communication abnormalities --- .../connection/client-dispatcher.cc | 12 +++++++-- hbase-native-client/connection/client-handler.cc | 12 ++++++--- .../connection/connection-factory.cc | 29 +++++++++++++------- hbase-native-client/connection/rpc-client.cc | 31 ++++++++++++++++++++-- hbase-native-client/connection/rpc-client.h | 2 ++ hbase-native-client/core/configuration.cc | 2 +- hbase-native-client/exceptions/exception.h | 14 +++++++++- 7 files changed, 83 insertions(+), 19 deletions(-) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index b9b2c34e91..30fd212689 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -18,8 +18,9 @@ */ #include "connection/client-dispatcher.h" #include - +#include #include +#include "exceptions/exception.h" using std::unique_ptr; @@ -51,7 +52,14 @@ folly::Future> ClientDispatcher::operator()(unique_ptr ClientHandler::write(Context *ctx, std::unique_ptrinsert(std::make_pair(r->call_id(), r->resp_msg())); - // Send the data down the pipeline. - return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + try { + // Send the data down the pipeline. + return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + } catch (const folly::AsyncSocketException &e) { + /* clear protobuf::Message to avoid overflow. */ + resp_msgs_->erase(r->call_id()); + throw e; + } } } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index a0c7f96118..e763c038bd 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -22,11 +22,16 @@ #include +#include +#include +#include + #include "connection/client-dispatcher.h" #include "connection/connection-factory.h" #include "connection/pipeline.h" #include "connection/sasl-handler.h" #include "connection/service.h" +#include "exceptions/exception.h" using std::chrono::milliseconds; using std::chrono::nanoseconds; @@ -56,15 +61,19 @@ std::shared_ptr> ConnectionFactory::M std::shared_ptr ConnectionFactory::Connect( std::shared_ptr> client, const std::string &hostname, uint16_t port) { - // Yes this will block however it makes dealing with connection pool soooooo - // much nicer. - // TODO see about using shared promise for this. - auto pipeline = client - ->connect(folly::SocketAddress(hostname, port, true), - std::chrono::duration_cast(connect_timeout_)) - .get(); - auto dispatcher = std::make_shared(); - dispatcher->setPipeline(pipeline); - return dispatcher; + try { + // Yes this will block however it makes dealing with connection pool soooooo + // much nicer. + // TODO see about using shared promise for this. + auto pipeline = client + ->connect(folly::SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)) + .get(); + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + return dispatcher; + } catch (const folly::AsyncSocketException &e) { + throw ConnectionException(folly::exception_wrapper{e}); + } } } // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 10faa7a84e..e90db60df4 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -20,8 +20,10 @@ #include "connection/rpc-client.h" #include +#include #include #include +#include "exceptions/exception.h" using hbase::security::User; using std::chrono::nanoseconds; @@ -55,7 +57,7 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::unique_ptr req, std::shared_ptr ticket) { auto remote_id = std::make_shared(host, port, ticket); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return SendRequest(remote_id, std::move(req)); } folly::Future> RpcClient::AsyncCall(const std::string& host, @@ -64,7 +66,32 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr ticket, const std::string& service_name) { auto remote_id = std::make_shared(host, port, ticket, service_name); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return SendRequest(remote_id, std::move(req)); +} + +/** + * TODO: + * This function should plug in ConnectionRetryPolicy to handle: + * 1. AsyncSocketException as a result of the 1st time connection establishment. + * 2. ConnectionException (i.e. AsyncSocketException as a cause) for Request/Response async call after + * the corresponding connection is established. + */ +folly::Future> RpcClient::SendRequest( + std::shared_ptr remote_id, std::unique_ptr req) { + try { + auto connection = GetConnection(remote_id); + return connection->SendRequest(std::move(req)).onError([&](const ConnectionException& e) { + /* for simplicity, remove this connection from pool. */ + cp_->Close(remote_id); + auto resp = std::make_unique(); + resp->set_exception(folly::exception_wrapper{e}); + return folly::makeFuture>(std::move(resp)); + }); + } catch (const ConnectionException& e) { + auto resp = std::make_unique(); + resp->set_exception(folly::exception_wrapper{e}); + return folly::makeFuture>(std::move(resp)); + } } std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index 0ecde5b775..0e1c8cb445 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -65,6 +65,8 @@ class RpcClient { private: std::shared_ptr GetConnection(std::shared_ptr remote_id); + folly::Future> SendRequest(std::shared_ptr remote_id, + std::unique_ptr req); private: std::shared_ptr cp_; diff --git a/hbase-native-client/core/configuration.cc b/hbase-native-client/core/configuration.cc index f4fc46d3ac..1fd2851559 100644 --- a/hbase-native-client/core/configuration.cc +++ b/hbase-native-client/core/configuration.cc @@ -24,8 +24,8 @@ #include #include -#include #include +#include namespace hbase { diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index bdedff4068..2e79055625 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -59,7 +59,7 @@ class IOException : public std::logic_error { IOException(const std::string& what, bool do_not_retry) : logic_error(what), do_not_retry_(do_not_retry) {} - IOException(const std::string& what, folly::exception_wrapper cause) + IOException(const std::string& what, const folly::exception_wrapper& cause) : logic_error(what), cause_(cause), do_not_retry_(false) {} IOException(const std::string& what, folly::exception_wrapper cause, bool do_not_retry) @@ -115,6 +115,18 @@ class RetriesExhaustedException : public IOException { int32_t num_retries_; }; +class ConnectionException : public IOException { + public: + ConnectionException() {} + + ConnectionException(const std::string& what) : IOException(what) {} + + ConnectionException(const folly::exception_wrapper& cause) : IOException("", cause) {} + + ConnectionException(const std::string& what, const folly::exception_wrapper& cause) + : IOException(what, cause) {} +}; + class RemoteException : public IOException { public: RemoteException() : IOException(), port_(0) {} -- 2.11.0 (Apple Git-81)