From 2c7b95be055ca4e22fe73fcd2afd9e8edc7f6f76 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Fri, 26 May 2017 12:39:19 -0700 Subject: [PATCH] HBASE-18078. [C++] Harden RPC by handling various communication abnormalities --- .../connection/connection-factory.cc | 38 ++++++++++++++----- .../connection/connection-factory.h | 12 +++++- .../connection/connection-pool-test.cc | 12 +++--- hbase-native-client/connection/connection-pool.cc | 44 +++++++++++++++------- hbase-native-client/connection/connection-pool.h | 5 ++- hbase-native-client/connection/rpc-client.cc | 17 +++++++-- hbase-native-client/connection/rpc-client.h | 5 ++- hbase-native-client/core/location-cache.cc | 2 +- hbase-native-client/exceptions/exception.h | 21 ++++++++++- 9 files changed, 116 insertions(+), 40 deletions(-) diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index afa227d..ee509cd 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -24,6 +24,9 @@ #include "connection/client-dispatcher.h" #include "connection/pipeline.h" #include "connection/service.h" +#include +#include +#include "exceptions/exception.h" using namespace folly; using namespace hbase; @@ -48,16 +51,31 @@ std::shared_ptr> ConnectionFactory::M } std::shared_ptr ConnectionFactory::Connect( + std::shared_ptr> client, + const std::string &hostname, uint16_t port) { + return AsyncConnect(client, hostname, port).get(); +} + +folly::Future> ConnectionFactory::AsyncConnect( std::shared_ptr> client, const std::string &hostname, uint16_t port) { - // Yes this will block however it makes dealing with connection pool soooooo - // much nicer. - // TODO see about using shared promise for this. - auto pipeline = client - ->connect(SocketAddress(hostname, port, true), - std::chrono::duration_cast(connect_timeout_)) - .get(); - auto dispatcher = std::make_shared(); - dispatcher->setPipeline(pipeline); - return dispatcher; + + folly::Promise> promise; + auto future = promise.getFuture(); + + try { + /* any connection error (e.g. timeout) will be folly::AsyncSocketException */ + auto pipeline = client->connect( + SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)).get(); + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + promise.setValue(dispatcher); + } catch(const folly::AsyncSocketException &e) { + promise.setException( + folly::make_exception_wrapper( + folly::make_exception_wrapper(e))); + } + + return future; } diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index 1e75571..b22951b 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include @@ -55,7 +56,7 @@ class ConnectionFactory { virtual std::shared_ptr> MakeBootstrap(); /** - * Connect a ClientBootstrap to a server and return the pipeline. + * Connect a ClientBootstrap to a server and return the wangle::Service. * * This is mostly visible so that mocks can override socket connections. */ @@ -63,6 +64,15 @@ class ConnectionFactory { std::shared_ptr> client, const std::string &hostname, uint16_t port); + /** + * Asynchronously Connect a ClientBootstrap to a server and return the wangle::Service. + * This async function makes it easy to propagate exceptions in a controlled way with + * help of folly::Future/Promise. + */ + virtual folly::Future> AsyncConnect( + std::shared_ptr> client, + const std::string &hostname, uint16_t port); + private: nanoseconds connect_timeout_; std::shared_ptr io_pool_; diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index 8ecdf29..8aeb93d 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -72,9 +72,9 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { ConnectionPool cp{mock_cf}; auto remote_id = std::make_shared(hostname, port); - auto result = cp.GetConnection(remote_id); + auto result = cp.GetConnection(remote_id).get(); ASSERT_TRUE(result != nullptr); - result = cp.GetConnection(remote_id); + result = cp.GetConnection(remote_id).get(); } TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { @@ -92,12 +92,12 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { { auto remote_id = std::make_shared(hostname_one, port); - auto result_one = cp.GetConnection(remote_id); + auto result_one = cp.GetConnection(remote_id).get(); auto remote_id2 = std::make_shared(hostname_two, port); - auto result_two = cp.GetConnection(remote_id2); + auto result_two = cp.GetConnection(remote_id2).get(); } auto remote_id = std::make_shared(hostname_one, port); - auto result_one = cp.GetConnection(remote_id); + auto result_one = cp.GetConnection(remote_id).get(); auto remote_id2 = std::make_shared(hostname_two, port); - auto result_two = cp.GetConnection(remote_id2); + auto result_two = cp.GetConnection(remote_id2).get(); } diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 3121294..eb9a609 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -23,10 +23,13 @@ #include #include #include +#include #include #include +#include "exceptions/exception.h" + using std::mutex; using std::unique_ptr; using std::shared_ptr; @@ -34,6 +37,8 @@ using hbase::ConnectionPool; using hbase::HBaseService; using folly::SharedMutexWritePriority; using folly::SocketAddress; +using namespace folly; +using namespace hbase; ConnectionPool::ConnectionPool(std::shared_ptr io_executor, std::shared_ptr codec, nanoseconds connect_timeout) @@ -46,16 +51,17 @@ ConnectionPool::ConnectionPool(std::shared_ptr cf) ConnectionPool::~ConnectionPool() { Close(); } -std::shared_ptr ConnectionPool::GetConnection( +folly::Future> ConnectionPool::GetConnection( std::shared_ptr remote_id) { // Try and get th cached connection. - auto found_ptr = GetCachedConnection(remote_id); + std::shared_ptr found_ptr = GetCachedConnection(remote_id); // If there's no connection then create it. if (found_ptr == nullptr) { - found_ptr = GetNewConnection(remote_id); + return GetNewConnection(remote_id); + } else { + return folly::makeFuture>(std::move(found_ptr)); } - return found_ptr; } std::shared_ptr ConnectionPool::GetCachedConnection( @@ -68,18 +74,21 @@ std::shared_ptr ConnectionPool::GetCachedConnection( return found->second; } -std::shared_ptr ConnectionPool::GetNewConnection( +folly::Future> ConnectionPool::GetNewConnection( std::shared_ptr remote_id) { // Grab the upgrade lock. While we are double checking other readers can // continue on SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; + folly::Promise> promise; + auto future = promise.getFuture(); + // Now check if someone else created the connection before we got the lock // This is safe since we hold the upgrade lock. // upgrade lock is more power than the reader lock. auto found = connections_.find(remote_id); if (found != connections_.end() && found->second != nullptr) { - return found->second; + promise.setValue(found->second); } else { // Yeah it looks a lot like there's no connection SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; @@ -89,14 +98,23 @@ std::shared_ptr ConnectionPool::GetNewConnection( /* create new connection */ auto clientBootstrap = cf_->MakeBootstrap(); - auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); - auto connection = std::make_shared(remote_id, dispatcher); - - connections_.insert(std::make_pair(remote_id, connection)); - clients_.insert(std::make_pair(remote_id, clientBootstrap)); - - return connection; + try { + auto dispatcher = cf_->AsyncConnect( + clientBootstrap, + remote_id->host(), + remote_id->port()).get(); + auto connection = std::make_shared(remote_id, dispatcher); + promise.setValue(connection); + connections_.insert(std::make_pair(remote_id, connection)); + clients_.insert(std::make_pair(remote_id, clientBootstrap)); + } catch(const hbase::ConnectionException &e) { + /* propagating ConnectionException up */ + promise.setException( + folly::make_exception_wrapper(e)); + } } + + return future; } void ConnectionPool::Close(std::shared_ptr remote_id) { diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 2a8f195..52e4537 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include #include @@ -69,7 +70,7 @@ class ConnectionPool { * Get a connection to the server name. Start time is ignored. * This can be a blocking operation for a short time. */ - std::shared_ptr GetConnection(std::shared_ptr remote_id); + folly::Future> GetConnection(std::shared_ptr remote_id); /** * Close/remove a connection. @@ -83,7 +84,7 @@ class ConnectionPool { private: std::shared_ptr GetCachedConnection(std::shared_ptr remote_id); - std::shared_ptr GetNewConnection(std::shared_ptr remote_id); + folly::Future> GetNewConnection(std::shared_ptr remote_id); std::unordered_map, std::shared_ptr, ConnectionIdHash, ConnectionIdEquals> connections_; diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 5fa1138..7392ea8 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -24,6 +24,7 @@ #include #include #include +#include "exceptions/exception.h" using hbase::RpcClient; @@ -55,7 +56,7 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::unique_ptr req, std::shared_ptr ticket) { auto remote_id = std::make_shared(host, port, ticket); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return CallForResult(remote_id, std::move(req)); } folly::Future> RpcClient::AsyncCall(const std::string& host, @@ -64,10 +65,18 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr ticket, const std::string& service_name) { auto remote_id = std::make_shared(host, port, ticket, service_name); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return CallForResult(remote_id, std::move(req)); } -std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { - return cp_->GetConnection(remote_id); +folly::Future> RpcClient::CallForResult( + std::shared_ptr remote_id, + std::unique_ptr req) { + try { + auto connection = cp_->GetConnection(remote_id).get(); + return connection->SendRequest(std::move(req)); + } catch (const hbase::ConnectionException &e) { + return folly::makeFuture>( + folly::make_exception_wrapper(e)); + } } } // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index d416ceb..142dcf3 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -72,8 +72,9 @@ class RpcClient { std::shared_ptr connection_pool() const { return cp_; } - private: - std::shared_ptr GetConnection(std::shared_ptr remote_id); +private: + folly::Future> CallForResult(std::shared_ptr remote_id, + std::unique_ptr req); private: std::shared_ptr cp_; diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index e0afcfb..c854e5c 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -124,7 +124,7 @@ Future> LocationCache::LocateFromMeta(const Tabl .via(cpu_executor_.get()) .then([this](ServerName sn) { auto remote_id = std::make_shared(sn.host_name(), sn.port()); - return this->cp_->GetConnection(remote_id); + return this->cp_->GetConnection(remote_id).get(); }) .then([tn, row, this](std::shared_ptr rpc_connection) { return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row))); diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index 2943d57..d23e729 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -62,7 +62,7 @@ public: logic_error(what) {} IOException( const std::string& what, - folly::exception_wrapper cause) : + const folly::exception_wrapper &cause) : logic_error(what), cause_(cause) {} virtual ~IOException() = default; @@ -102,6 +102,25 @@ private: class HBaseIOException : public IOException { }; +class ConnectionException : public IOException { +public: + ConnectionException() { + } + + ConnectionException(const std::string& what) : + IOException(what) { + } + + ConnectionException(const folly::exception_wrapper &cause) : + IOException("", cause) { + } + + ConnectionException( + const std::string& what, + const folly::exception_wrapper &cause) : IOException(what, cause) { + } +}; + class RemoteException : public IOException { public: -- 2.10.1 (Apple Git-78)