From 2f7f8b53b35731eb6e70b748659ba1987f3c36c3 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Mon, 7 Aug 2017 17:44:59 -0700 Subject: [PATCH] HBASE-18078. [C++] Harden RPC by handling various communication abnormalities --- .../connection/client-dispatcher.cc | 16 ++++- hbase-native-client/connection/client-handler.cc | 14 ++-- .../connection/connection-factory.cc | 29 +++++--- hbase-native-client/connection/rpc-client.cc | 49 ++++++++++++- hbase-native-client/connection/rpc-client.h | 8 +++ .../connection/rpc-test-server-handler.cc | 3 + hbase-native-client/connection/rpc-test-server.cc | 3 + hbase-native-client/connection/rpc-test.cc | 84 +++++++++++++++++----- hbase-native-client/exceptions/exception.h | 16 ++++- hbase-native-client/if/test_rpc_service.proto | 1 + 10 files changed, 186 insertions(+), 37 deletions(-) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index b9b2c34e91..d5d7f5fe54 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -18,8 +18,10 @@ */ #include "connection/client-dispatcher.h" #include - +#include +#include #include +#include "exceptions/exception.h" using std::unique_ptr; @@ -31,6 +33,9 @@ void ClientDispatcher::read(Context *ctx, unique_ptr in) { auto call_id = in->call_id(); auto p = requests_.find_and_erase(call_id); + VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what: {}", + in->call_id(), bool(in->exception()), in->exception().what()); + if (in->exception()) { p.setException(in->exception()); } else { @@ -51,7 +56,14 @@ folly::Future> ClientDispatcher::operator()(unique_ptr buf) { : ""; std::string stack_trace = exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : ""; - what.append(exception_class_name).append(stack_trace); + what.append(stack_trace); auto remote_exception = std::make_unique(what); remote_exception->set_exception_class_name(exception_class_name) @@ -133,7 +133,13 @@ folly::Future ClientHandler::write(Context *ctx, std::unique_ptrinsert(std::make_pair(r->call_id(), r->resp_msg())); - // Send the data down the pipeline. - return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + try { + // Send the data down the pipeline. + return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + } catch (const folly::AsyncSocketException &e) { + /* clear protobuf::Message to avoid overflow. */ + resp_msgs_->erase(r->call_id()); + throw e; + } } } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index a0c7f96118..e763c038bd 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -22,11 +22,16 @@ #include +#include +#include +#include + #include "connection/client-dispatcher.h" #include "connection/connection-factory.h" #include "connection/pipeline.h" #include "connection/sasl-handler.h" #include "connection/service.h" +#include "exceptions/exception.h" using std::chrono::milliseconds; using std::chrono::nanoseconds; @@ -56,15 +61,19 @@ std::shared_ptr> ConnectionFactory::M std::shared_ptr ConnectionFactory::Connect( std::shared_ptr> client, const std::string &hostname, uint16_t port) { - // Yes this will block however it makes dealing with connection pool soooooo - // much nicer. - // TODO see about using shared promise for this. - auto pipeline = client - ->connect(folly::SocketAddress(hostname, port, true), - std::chrono::duration_cast(connect_timeout_)) - .get(); - auto dispatcher = std::make_shared(); - dispatcher->setPipeline(pipeline); - return dispatcher; + try { + // Yes this will block however it makes dealing with connection pool soooooo + // much nicer. + // TODO see about using shared promise for this. + auto pipeline = client + ->connect(folly::SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)) + .get(); + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + return dispatcher; + } catch (const folly::AsyncSocketException &e) { + throw ConnectionException(folly::exception_wrapper{e}); + } } } // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 10faa7a84e..a16dca694a 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -19,9 +19,12 @@ #include "connection/rpc-client.h" +#include #include +#include #include #include +#include "exceptions/exception.h" using hbase::security::User; using std::chrono::nanoseconds; @@ -55,7 +58,7 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::unique_ptr req, std::shared_ptr ticket) { auto remote_id = std::make_shared(host, port, ticket); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return SendRequest(remote_id, std::move(req)); } folly::Future> RpcClient::AsyncCall(const std::string& host, @@ -64,7 +67,49 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr ticket, const std::string& service_name) { auto remote_id = std::make_shared(host, port, ticket, service_name); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return SendRequest(remote_id, std::move(req)); +} + +/** + * There are two cases for ConnectionException: + * 1. The first time connection + * establishment, i.e. GetConnection(remote_id), AsyncSocketException being a cause. + * 2. Writing request down the pipeline, i.e. RpcConnection::SendRequest, AsyncSocketException being + * a cause as well. + */ +folly::Future> RpcClient::SendRequest( + std::shared_ptr remote_id, std::unique_ptr req) { + try { + return GetConnection(remote_id) + ->SendRequest(std::move(req)) + .onError([&, this](const folly::exception_wrapper& ew) { + VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what()); + ew.with_exception([&, this](const hbase::ConnectionException& re) { + /* bad connection, remove it from pool. */ + cp_->Close(remote_id); + }); + return GetFutureWithException(ew); + }); + } catch (const ConnectionException& e) { + CHECK(e.cause().get_exception() != nullptr); + VLOG(3) << folly::sformat("RpcClient Exception: {}", e.cause().what()); + /* bad connection, remove it from pool. */ + cp_->Close(remote_id); + return GetFutureWithException(e); + } +} + +template +folly::Future> RpcClient::GetFutureWithException(const EXCEPTION& e) { + return GetFutureWithException(folly::exception_wrapper{e}); +} + +folly::Future> RpcClient::GetFutureWithException( + const folly::exception_wrapper& ew) { + folly::Promise> promise; + auto future = promise.getFuture(); + promise.setException(ew); + return future; } std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index 0ecde5b775..8145be4388 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -20,6 +20,7 @@ #include +#include #include #include #include @@ -65,6 +66,13 @@ class RpcClient { private: std::shared_ptr GetConnection(std::shared_ptr remote_id); + folly::Future> SendRequest(std::shared_ptr remote_id, + std::unique_ptr req); + template + folly::Future> GetFutureWithException(const EXCEPTION &e); + + folly::Future> GetFutureWithException( + const folly::exception_wrapper &ew); private: std::shared_ptr cp_; diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc index 7f41b7ef91..8e405ef879 100644 --- a/hbase-native-client/connection/rpc-test-server-handler.cc +++ b/hbase-native-client/connection/rpc-test-server-handler.cc @@ -72,6 +72,9 @@ std::unique_ptr RpcTestServerSerializeHandler::CreateReceivedRequest( } else if (method_name == "addr") { result = std::make_unique(std::make_shared(), std::make_shared(), method_name); + } else if (method_name == "socketNotOpen") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); } return result; } diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc index b9e1f1375c..f350d6a5b8 100644 --- a/hbase-native-client/connection/rpc-test-server.cc +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -88,6 +88,9 @@ Future> RpcTestService::operator()(std::unique_ptr(); + response->set_resp_msg(pb_resp_msg); } return folly::makeFuture>(std::move(response)); diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc index 2949fe9167..e7f678dd14 100644 --- a/hbase-native-client/connection/rpc-test.cc +++ b/hbase-native-client/connection/rpc-test.cc @@ -20,10 +20,12 @@ #include #include +#include #include #include #include #include +#include #include #include #include @@ -41,6 +43,9 @@ using namespace folly; using namespace hbase; DEFINE_int32(port, 0, "test server port"); +DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result"); +DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.", + "output format of enforcing fail"); typedef ServerBootstrap ServerTestBootstrap; typedef std::shared_ptr ServerPtr; @@ -91,9 +96,10 @@ TEST_F(RpcTest, Echo) { auto server_addr = GetRpcServerAddress(server); auto client = CreateRpcClient(conf); - std::string greetings = "hello, hbase server!"; + auto method = "echo"; + auto greetings = "hello, hbase server!"; auto request = std::make_unique(std::make_shared(), - std::make_shared(), "echo"); + std::make_shared(), method); auto pb_msg = std::static_pointer_cast(request->req_msg()); pb_msg->set_message(greetings); @@ -101,14 +107,14 @@ TEST_F(RpcTest, Echo) { client ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), hbase::security::User::defaultUser()) - .then([=](std::unique_ptr response) { + .then([&](std::unique_ptr response) { auto pb_resp = std::static_pointer_cast(response->resp_msg()); EXPECT_TRUE(pb_resp != nullptr); - VLOG(1) << "RPC echo returned: " + pb_resp->message(); + VLOG(1) << folly::sformat(FLAGS_result_format, method, pb_resp->message()); EXPECT_EQ(greetings, pb_resp->message()); }) - .onError([](const folly::exception_wrapper& ew) { - FAIL() << "Shouldn't get here, no exception is expected for RPC echo."; + .onError([&](const folly::exception_wrapper& ew) { + FAIL() << folly::sformat(FLAGS_fail_format, method); }); server->stop(); @@ -118,23 +124,24 @@ TEST_F(RpcTest, Echo) { /** * test error */ -TEST_F(RpcTest, error) { +TEST_F(RpcTest, Error) { auto conf = CreateConf(); auto server = CreateRpcServer(); auto server_addr = GetRpcServerAddress(server); auto client = CreateRpcClient(conf); + auto method = "error"; auto request = std::make_unique(std::make_shared(), - std::make_shared(), "error"); + std::make_shared(), method); /* sending out request */ client ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), hbase::security::User::defaultUser()) - .then([=](std::unique_ptr response) { - FAIL() << "Shouldn't get here, exception is expected for RPC error."; + .then([&](std::unique_ptr response) { + FAIL() << folly::sformat(FLAGS_fail_format, method); }) - .onError([](const folly::exception_wrapper& ew) { - VLOG(1) << "RPC error returned with exception."; + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString(); std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString(); @@ -142,14 +149,57 @@ TEST_F(RpcTest, error) { EXPECT_TRUE(bool(ew)); EXPECT_EQ(kRemoteException, ew.class_name()); - /* verify RemoteException */ - EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& re) { - /* verify DoNotRetryIOException*/ - EXPECT_EQ(kRpcTestException, re.exception_class_name()); - EXPECT_EQ(kRpcTestException + ": server error!", re.stack_trace()); + /* verify exception */ + EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& e) { + EXPECT_EQ(kRpcTestException, e.exception_class_name()); + EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace()); })); }); server->stop(); server->join(); } + +TEST_F(RpcTest, SocketNotOpen) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "socketNotOpen"; + auto request = std::make_unique(std::make_shared(), + std::make_shared(), method); + + server->stop(); + server->join(); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr response) { + FAIL() << folly::sformat(FLAGS_fail_format, method); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + std::string kConnectionException = + demangle(typeid(hbase::ConnectionException)).toStdString(); + std::string kAsyncSocketException = + demangle(typeid(folly::AsyncSocketException)).toStdString(); + + /* verify exception_wrapper */ + EXPECT_TRUE(bool(ew)); + EXPECT_EQ(kConnectionException, ew.class_name()); + + /* verify exception */ + EXPECT_TRUE(ew.with_exception([&](const hbase::ConnectionException& e) { + EXPECT_TRUE(bool(e.cause())); + EXPECT_EQ(kAsyncSocketException, e.cause().class_name()); + VLOG(1) << folly::sformat(FLAGS_result_format, method, e.cause().what()); + e.cause().with_exception([&](const folly::AsyncSocketException& ase) { + EXPECT_EQ(AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, ase.getType()); + EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno()); + }); + })); + }); +} diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index bdedff4068..bc3b2913d9 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -59,7 +59,7 @@ class IOException : public std::logic_error { IOException(const std::string& what, bool do_not_retry) : logic_error(what), do_not_retry_(do_not_retry) {} - IOException(const std::string& what, folly::exception_wrapper cause) + IOException(const std::string& what, const folly::exception_wrapper& cause) : logic_error(what), cause_(cause), do_not_retry_(false) {} IOException(const std::string& what, folly::exception_wrapper cause, bool do_not_retry) @@ -67,7 +67,7 @@ class IOException : public std::logic_error { virtual ~IOException() = default; - virtual folly::exception_wrapper cause() { return cause_; } + virtual folly::exception_wrapper cause() const { return cause_; } bool do_not_retry() const { return do_not_retry_; } @@ -115,6 +115,18 @@ class RetriesExhaustedException : public IOException { int32_t num_retries_; }; +class ConnectionException : public IOException { + public: + ConnectionException() {} + + ConnectionException(const std::string& what) : IOException(what) {} + + ConnectionException(const folly::exception_wrapper& cause) : IOException("", cause) {} + + ConnectionException(const std::string& what, const folly::exception_wrapper& cause) + : IOException(what, cause) {} +}; + class RemoteException : public IOException { public: RemoteException() : IOException(), port_(0) {} diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto index 5f91dc4df4..2730403b8e 100644 --- a/hbase-native-client/if/test_rpc_service.proto +++ b/hbase-native-client/if/test_rpc_service.proto @@ -32,4 +32,5 @@ service TestProtobufRpcProto { rpc error(EmptyRequestProto) returns (EmptyResponseProto); rpc pause(PauseRequestProto) returns (EmptyResponseProto); rpc addr(EmptyRequestProto) returns (AddrResponseProto); + rpc socketNotOpen(EmptyRequestProto) returns (EmptyResponseProto); } -- 2.11.0 (Apple Git-81)