From 0b649293b636e7c36e3932a2ff9532d64b6644b0 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Thu, 27 Jul 2017 17:33:08 -0700 Subject: [PATCH] HBASE-18466. [C++] Support handling exception in RpcTestServer --- .../connection/rpc-test-server-handler.cc | 7 +- .../connection/rpc-test-server-handler.h | 2 +- hbase-native-client/connection/rpc-test-server.cc | 31 ++++- hbase-native-client/connection/rpc-test-server.h | 24 +++- hbase-native-client/connection/rpc-test.cc | 129 ++++++++++++++++----- hbase-native-client/serde/rpc-serde.cc | 27 +++++ hbase-native-client/serde/rpc-serde.h | 14 +++ 7 files changed, 194 insertions(+), 40 deletions(-) diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc index 7d2f407d55..7f41b7ef91 100644 --- a/hbase-native-client/connection/rpc-test-server-handler.cc +++ b/hbase-native-client/connection/rpc-test-server-handler.cc @@ -46,16 +46,17 @@ void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr RpcTestServerSerializeHandler::write(Context* ctx, - std::unique_ptr r) { + std::unique_ptr resp) { VLOG(3) << "Writing RPC Request"; // Send the data down the pipeline. - return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get())); + return ctx->fireWrite( + serde_.Response(resp->call_id(), resp->resp_msg().get(), resp->exception())); } std::unique_ptr RpcTestServerSerializeHandler::CreateReceivedRequest( const std::string& method_name) { std::unique_ptr result = nullptr; - ; + if (method_name == "ping") { result = std::make_unique(std::make_shared(), std::make_shared(), method_name); diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h index 4c84615baa..ab0264fc4c 100644 --- a/hbase-native-client/connection/rpc-test-server-handler.h +++ b/hbase-native-client/connection/rpc-test-server-handler.h @@ -36,7 +36,7 @@ class RpcTestServerSerializeHandler void read(Context* ctx, std::unique_ptr buf) override; - folly::Future write(Context* ctx, std::unique_ptr r) override; + folly::Future write(Context* ctx, std::unique_ptr resp) override; private: std::unique_ptr CreateReceivedRequest(const std::string& method_name); diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc index d3a30b104c..b9e1f1375c 100644 --- a/hbase-native-client/connection/rpc-test-server.cc +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -30,19 +30,35 @@ namespace hbase { RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline( std::shared_ptr sock) { + if (service_ == nullptr) { + initService(sock); + } + CHECK(service_ != nullptr); + auto pipeline = RpcTestServerSerializePipeline::create(); pipeline->addBack(AsyncSocketHandler(sock)); // ensure we can write from any thread pipeline->addBack(EventBaseHandler()); pipeline->addBack(LengthFieldBasedFrameDecoder()); pipeline->addBack(RpcTestServerSerializeHandler()); - pipeline->addBack( - MultiplexServerDispatcher, std::unique_ptr>(&service_)); + pipeline->addBack(MultiplexServerDispatcher, std::unique_ptr>( + service_.get())); pipeline->finalize(); return pipeline; } +void RpcTestServerPipelineFactory::initService(std::shared_ptr sock) { + /* get server address */ + SocketAddress localAddress; + sock->getLocalAddress(&localAddress); + + /* init service with server address */ + service_ = std::make_shared, std::unique_ptr>>( + std::make_shared(1), + std::make_shared(std::make_shared(localAddress))); +} + Future> RpcTestService::operator()(std::unique_ptr request) { /* build Response */ auto response = std::make_unique(); @@ -54,11 +70,20 @@ Future> RpcTestService::operator()(std::unique_ptrset_resp_msg(pb_resp_msg); } else if (method_name == "echo") { auto pb_resp_msg = std::make_shared(); + /* get msg from client */ auto pb_req_msg = std::static_pointer_cast(request->req_msg()); pb_resp_msg->set_message(pb_req_msg->message()); response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " echo called, " << pb_req_msg->message(); + } else if (method_name == "error") { - // TODO: + auto pb_resp_msg = std::make_shared(); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " error called."; + response->set_exception(folly::make_exception_wrapper("server error!")); + } else if (method_name == "pause") { // TODO: } else if (method_name == "addr") { diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h index c3225ff573..955560ef5b 100644 --- a/hbase-native-client/connection/rpc-test-server.h +++ b/hbase-native-client/connection/rpc-test-server.h @@ -17,12 +17,14 @@ * */ #pragma once +#include #include #include #include #include "connection/request.h" #include "connection/response.h" +#include "exceptions/exception.h" using namespace hbase; using namespace folly; @@ -31,11 +33,24 @@ using namespace wangle; namespace hbase { using RpcTestServerSerializePipeline = wangle::Pipeline>; +class RpcTestException : public IOException { + public: + RpcTestException() {} + RpcTestException(const std::string& what) : IOException(what) {} + RpcTestException(const std::string& what, const folly::exception_wrapper& cause) + : IOException(what, cause) {} + RpcTestException(const folly::exception_wrapper& cause) : IOException("", cause) {} +}; + class RpcTestService : public Service, std::unique_ptr> { public: - RpcTestService() {} + RpcTestService(std::shared_ptr socket_address) + : socket_address_(socket_address) {} virtual ~RpcTestService() = default; Future> operator()(std::unique_ptr request) override; + + private: + std::shared_ptr socket_address_; }; class RpcTestServerPipelineFactory : public PipelineFactory { @@ -44,7 +59,10 @@ class RpcTestServerPipelineFactory : public PipelineFactory sock) override; private: - ExecutorFilter, std::unique_ptr> service_{ - std::make_shared(1), std::make_shared()}; + void initService(std::shared_ptr sock); + + private: + std::shared_ptr, std::unique_ptr>> service_{ + nullptr}; }; } // end of namespace hbase diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc index d4cd89f0a4..2949fe9167 100644 --- a/hbase-native-client/connection/rpc-test.cc +++ b/hbase-native-client/connection/rpc-test.cc @@ -30,6 +30,7 @@ #include #include "connection/rpc-client.h" +#include "exceptions/exception.h" #include "if/test.pb.h" #include "rpc-test-server.h" #include "security/user.h" @@ -40,46 +41,114 @@ using namespace folly; using namespace hbase; DEFINE_int32(port, 0, "test server port"); +typedef ServerBootstrap ServerTestBootstrap; +typedef std::shared_ptr ServerPtr; -TEST(RpcTestServer, echo) { - /* create conf */ +class RpcTest : public ::testing::Test { + public: + static void SetUpTestCase() { google::InstallFailureSignalHandler(); } +}; + +std::shared_ptr CreateConf() { auto conf = std::make_shared(); conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true"); + return conf; +} +ServerPtr CreateRpcServer() { /* create rpc test server */ - auto server = std::make_shared>(); + auto server = std::make_shared(); server->childPipeline(std::make_shared()); server->bind(FLAGS_port); - folly::SocketAddress server_addr; - server->getSockets()[0]->getAddress(&server_addr); + return server; +} - /* create RpcClient */ +std::shared_ptr GetRpcServerAddress(ServerPtr server) { + auto addr = std::make_shared(); + server->getSockets()[0]->getAddress(addr.get()); + return addr; +} + +std::shared_ptr CreateRpcClient(std::shared_ptr conf) { auto io_executor = std::make_shared(1); + auto client = std::make_shared(io_executor, nullptr, conf); + return client; +} + +std::shared_ptr CreateRpcClient(std::shared_ptr conf, + std::chrono::nanoseconds connect_timeout) { + auto io_executor = std::make_shared(1); + auto client = std::make_shared(io_executor, nullptr, conf, connect_timeout); + return client; +} + +/** + * test echo + */ +TEST_F(RpcTest, Echo) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + std::string greetings = "hello, hbase server!"; + auto request = std::make_unique(std::make_shared(), + std::make_shared(), "echo"); + auto pb_msg = std::static_pointer_cast(request->req_msg()); + pb_msg->set_message(greetings); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([=](std::unique_ptr response) { + auto pb_resp = std::static_pointer_cast(response->resp_msg()); + EXPECT_TRUE(pb_resp != nullptr); + VLOG(1) << "RPC echo returned: " + pb_resp->message(); + EXPECT_EQ(greetings, pb_resp->message()); + }) + .onError([](const folly::exception_wrapper& ew) { + FAIL() << "Shouldn't get here, no exception is expected for RPC echo."; + }); + + server->stop(); + server->join(); +} + +/** + * test error + */ +TEST_F(RpcTest, error) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto request = std::make_unique(std::make_shared(), + std::make_shared(), "error"); + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([=](std::unique_ptr response) { + FAIL() << "Shouldn't get here, exception is expected for RPC error."; + }) + .onError([](const folly::exception_wrapper& ew) { + VLOG(1) << "RPC error returned with exception."; + std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString(); + std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString(); + + /* verify exception_wrapper */ + EXPECT_TRUE(bool(ew)); + EXPECT_EQ(kRemoteException, ew.class_name()); - auto rpc_client = std::make_shared(io_executor, nullptr, conf); - - /** - * test echo - */ - try { - std::string greetings = "hello, hbase server!"; - auto request = std::make_unique(std::make_shared(), - std::make_shared(), "echo"); - auto pb_msg = std::static_pointer_cast(request->req_msg()); - pb_msg->set_message(greetings); - - /* sending out request */ - rpc_client - ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request), - hbase::security::User::defaultUser()) - .then([=](std::unique_ptr response) { - auto pb_resp = std::static_pointer_cast(response->resp_msg()); - VLOG(1) << "message returned: " + pb_resp->message(); - EXPECT_EQ(greetings, pb_resp->message()); - }); - } catch (const std::exception& e) { - throw e; - } + /* verify RemoteException */ + EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& re) { + /* verify DoNotRetryIOException*/ + EXPECT_EQ(kRpcTestException, re.exception_class_name()); + EXPECT_EQ(kRpcTestException + ": server error!", re.stack_trace()); + })); + }); server->stop(); server->join(); diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc index 9e1f79a34c..70a57e8973 100644 --- a/hbase-native-client/serde/rpc-serde.cc +++ b/hbase-native-client/serde/rpc-serde.cc @@ -40,6 +40,8 @@ using google::protobuf::io::CodedInputStream; using google::protobuf::io::CodedOutputStream; using google::protobuf::io::ZeroCopyOutputStream; +using namespace hbase::pb; + namespace hbase { static const std::string PREAMBLE = "HBas"; @@ -174,6 +176,31 @@ std::unique_ptr RpcSerde::Response(const uint32_t call_id, return PrependLength(std::move(ser_header)); } +std::unique_ptr RpcSerde::Response(const uint32_t call_id, + const google::protobuf::Message *msg, + const folly::exception_wrapper &exception) { + /* create ResponseHeader */ + pb::ResponseHeader rh; + rh.set_call_id(call_id); + + /* create ExceptionResponse */ + if (bool(exception)) { + VLOG(1) << "packing ExceptionResponse"; + auto exception_response = new pb::ExceptionResponse(); + exception_response->set_exception_class_name(exception.class_name().c_str()); + exception_response->set_stack_trace(exception.what().c_str()); + rh.set_allocated_exception(exception_response); + } + + /* serialize Response header and body */ + auto ser_header = SerializeDelimited(rh); + auto ser_resp = SerializeDelimited(*msg); + ser_header->appendChain(std::move(ser_resp)); + + VLOG(3) << "Converted hbase::Response to folly::IOBuf"; + return PrependLength(std::move(ser_header)); +} + std::unique_ptr RpcSerde::CreateCellScanner(std::unique_ptr buf, uint32_t offset, uint32_t length) { if (codec_ == nullptr) { diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h index 0e1d44e8e3..6941f620d2 100644 --- a/hbase-native-client/serde/rpc-serde.h +++ b/hbase-native-client/serde/rpc-serde.h @@ -21,10 +21,12 @@ #include #include +#include #include "if/HBase.pb.h" #include "serde/cell-scanner.h" #include "serde/codec.h" +using namespace folly; // Forward namespace folly { class IOBuf; @@ -110,6 +112,18 @@ class RpcSerde { const google::protobuf::Message *msg); /** + * Serialize a response message into a protobuf. + * Request consists of: + * + * - Big endian length + * - ResponseHeader object + * - The passed in hbase::Response object + */ + std::unique_ptr Response(const uint32_t call_id, + const google::protobuf::Message *msg, + const folly::exception_wrapper &exception); + + /** * Serialize a message in the delimited format. * Delimited format consists of the following: * -- 2.11.0 (Apple Git-81)