diff --git hbase-native-client/connection/rpc-test-server.cc hbase-native-client/connection/rpc-test-server.cc index 6132fbb..707bca7 100644 --- hbase-native-client/connection/rpc-test-server.cc +++ hbase-native-client/connection/rpc-test-server.cc @@ -88,7 +88,14 @@ Future> RpcTestService::operator()(std::unique_ptrset_exception(folly::make_exception_wrapper("server error!")); } else if (method_name == "pause") { - // TODO: + auto pb_resp_msg = std::make_shared(); + /* sleeping */ + auto pb_req_msg = std::static_pointer_cast(request->req_msg()); + std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms())); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " pause called, " << pb_req_msg->ms() << " ms"; + } else if (method_name == "addr") { // TODO: } else if (method_name == "socketNotOpen") { diff --git hbase-native-client/connection/rpc-test.cc hbase-native-client/connection/rpc-test.cc index 4688950..d541397 100644 --- hbase-native-client/connection/rpc-test.cc +++ hbase-native-client/connection/rpc-test.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include "connection/rpc-client.h" #include "exceptions/exception.h" @@ -41,11 +42,14 @@ using namespace wangle; using namespace folly; using namespace hbase; +using namespace std::chrono; DEFINE_int32(port, 0, "test server port"); DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result"); -DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.", - "output format of enforcing fail"); +DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.", + "output format of enforcing fail with exception"); +DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.", + "output format of enforcing fail without exception"); typedef ServerBootstrap ServerTestBootstrap; typedef std::shared_ptr ServerPtr; @@ -110,8 +114,8 @@ TEST_F(RpcTest, Ping) { VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); }) .onError([&](const folly::exception_wrapper& ew) { - FAIL() << folly::sformat(FLAGS_fail_format, method); - }); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }).get(); server->stop(); server->join(); @@ -144,8 +148,8 @@ TEST_F(RpcTest, Echo) { EXPECT_EQ(greetings, pb_resp->message()); }) .onError([&](const folly::exception_wrapper& ew) { - FAIL() << folly::sformat(FLAGS_fail_format, method); - }); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }).get(); server->stop(); server->join(); @@ -168,7 +172,7 @@ TEST_F(RpcTest, Error) { ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), hbase::security::User::defaultUser()) .then([&](std::unique_ptr response) { - FAIL() << folly::sformat(FLAGS_fail_format, method); + FAIL() << folly::sformat(FLAGS_fail_ex_format, method); }) .onError([&](const folly::exception_wrapper& ew) { VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); @@ -184,7 +188,7 @@ TEST_F(RpcTest, Error) { EXPECT_EQ(kRpcTestException, e.exception_class_name()); EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace()); })); - }); + }).get(); server->stop(); server->join(); @@ -208,7 +212,7 @@ TEST_F(RpcTest, SocketNotOpen) { ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), hbase::security::User::defaultUser()) .then([&](std::unique_ptr response) { - FAIL() << folly::sformat(FLAGS_fail_format, method); + FAIL() << folly::sformat(FLAGS_fail_ex_format, method); }) .onError([&](const folly::exception_wrapper& ew) { VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); @@ -231,5 +235,42 @@ TEST_F(RpcTest, SocketNotOpen) { EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno()); }); })); - }); + }).get(); +} + +/** + * test pause + */ +TEST_F(RpcTest, Pause) { + int ms = 500; + + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = + CreateRpcClient(conf, std::chrono::duration_cast(milliseconds(2 * ms))); + + auto method = "pause"; + auto request = std::make_unique(std::make_shared(), + std::make_shared(), method); + auto pb_msg = std::static_pointer_cast(request->req_msg()); + + pb_msg->set_ms(ms); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr response) { + auto pb_resp = std::static_pointer_cast(response->resp_msg()); + EXPECT_TRUE(pb_resp != nullptr); + VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }).get(); + + server->stop(); + server->join(); }