From 4204c3295191071d9406fde75797b11cd1c2e4ec Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Wed, 9 Aug 2017 10:16:24 -0700 Subject: [PATCH] HBASE-18534. [C++] Support timeout in Rpc --- .../connection/async-request-timeout.cc | 68 ++++++++++++++++++++++ .../connection/async-request-timeout.h | 60 +++++++++++++++++++ .../connection/client-dispatcher.cc | 27 ++++++--- hbase-native-client/connection/client-dispatcher.h | 6 +- .../connection/connection-factory.cc | 2 +- .../connection/rpc-test-server-handler.cc | 3 + hbase-native-client/connection/rpc-test-server.cc | 11 ++++ hbase-native-client/connection/rpc-test.cc | 28 +++++++++ hbase-native-client/exceptions/exception.h | 12 ++++ hbase-native-client/if/test_rpc_service.proto | 1 + 10 files changed, 207 insertions(+), 11 deletions(-) create mode 100644 hbase-native-client/connection/async-request-timeout.cc create mode 100644 hbase-native-client/connection/async-request-timeout.h diff --git a/hbase-native-client/connection/async-request-timeout.cc b/hbase-native-client/connection/async-request-timeout.cc new file mode 100644 index 0000000000..85eca9cce9 --- /dev/null +++ b/hbase-native-client/connection/async-request-timeout.cc @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "async-request-timeout.h" +#include +#include "connections/exception.h" + +namespace hbase { + +AsyncRequestTimeout::AsyncRequestTimeout( + uint32_t call_id, std::chrono::milliseconds timeout_ms, + std::shared_ptr>> req_promise) + : AsyncTimeout(&event_base_), call_id_(call_id), timeout_ms_(timeout_ms), req_promise__(req_promise_) {} + +AsyncRequestTimeout::~AsyncRequestTimeout() {} + +void AsyncRequestTimeout::set_request_sent(bool request_sent) { request_sent_ = request_sent; } + +/** + * TODO: make it thread safe. This funcion has race condition of write accessing the promise (i.e. + * req_promise_). + */ +void AsyncRequestTimeout::timeoutExpired() { + /* fullfilled by ClientDispatcher::read */ + if (req_promise_.get() == nullptr) { + return; + } + + if (request_sent_) { + /* request is sent to server, timeout before getting any result. */ + if (req_promise_.get()) { + req_promise_.setException(folly::exception_wrapper{ + TimeoutException{folly::sformat("request {} timed out", call_id_)}}); + } + } else { + /** + * request is not sent to server, cancel current timeout and schedule another one to wait for + * result.*/ + this->cancelTimeout(); + this->scheduleTimeout(timeout_ms_); + } +} + +TimeoutRequest::TimeoutRequest(uint32_t call_id, std::chrono::milliseconds timeout_ms) { + Init(call_id, timeout_ms); +} + +void TimeoutRequest::Init(uint32_t call_id, std::chrono::milliseconds timeout_ms) { + promise_ = std::make_shared>>(); + timeout_ = std::make_shared(call_id, timeout_ms, promise_); +} + +} /* namespace hbase */ diff --git a/hbase-native-client/connection/async-request-timeout.h b/hbase-native-client/connection/async-request-timeout.h new file mode 100644 index 0000000000..3b9792c603 --- /dev/null +++ b/hbase-native-client/connection/async-request-timeout.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include + +#include +#include "connection/request.h" +#include "utils/concurrent-map.h" + +using namespace folly; + +namespace hbase { +class AsyncRequestTimeout : public folly::AsyncTimeout { + public: + AsyncRequestTimeout(uint32_t call_id, std::chrono::milliseconds timeout_ms, + std::shared_ptr>> req_promise); + virtual ~AsyncRequestTimeout(); + void timeoutExpired() noexcept override; + void set_request_sent(bool request_sent); + + private: + EventBase event_base_{}; + uint32_t call_id_; + std::chrono::milliseconds timeout_ms_; + std::atomic_bool request_sent_{false}; + std::shared_ptr>> req_promise_; +}; + +class TimeoutRequest { + public: + TimeoutRequest() {} + TimeoutRequest(uint32_t call_id, std::chrono::milliseconds timeout_ms); + virtual ~AsyncRequestTimeout() = default; + void Init(uint32_t call_id, std::chrono::milliseconds timeout_ms); + std::shared_ptr>> promise() { return promise_; } + std::shared_ptr timeout { return timeout_; } + + private: + std::shared_ptr timeout_; + std::shared_ptr>> promise_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index d5d7f5fe54..127b91f635 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -21,46 +21,57 @@ #include #include #include +#include "core/connection-configuration.h" #include "exceptions/exception.h" using std::unique_ptr; namespace hbase { -ClientDispatcher::ClientDispatcher() : current_call_id_(9), requests_(5000) {} +ClientDispatcher::ClientDispatcher(std::shared_ptr conf) + : conf_(conf), current_call_id_(9), requests_(5000) {} void ClientDispatcher::read(Context *ctx, unique_ptr in) { auto call_id = in->call_id(); - auto p = requests_.find_and_erase(call_id); + auto req = requests_.find_and_erase(call_id); VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what: {}", in->call_id(), bool(in->exception()), in->exception().what()); if (in->exception()) { - p.setException(in->exception()); + req.promise()->setException(in->exception()); } else { - p.setValue(std::move(in)); + req.promise()->setValue(std::move(in)); } } folly::Future> ClientDispatcher::operator()(unique_ptr arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); + int read_rpc_timeout = conf_.GetLong(ConnectionConfiguration::kRpcReadTimeout, + ConnectionConfiguration::kDefaultRpcTimeout); // TODO: if the map is full (or we have more than hbase.client.perserver.requests.threshold) // then throw ServerTooBusyException so that upper layers will retry. - auto &p = requests_[call_id]; + auto &req = requests_[call_id]; + req.Init(call_id, read_rpc_timeout); - auto f = p.getFuture(); - p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { + auto f = req.promise()->getFuture(); + req.promise()->setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { LOG(ERROR) << "e = " << call_id; this->requests_.erase(call_id); }); try { this->pipeline_->write(std::move(arg)); + /* set timeout status and schedule timeout of read rpc. */ + req.timeout()->set_request_sent(true); + if (read_rpc_timeout > 0) { + req.timeout()->scheduleTimeout(read_rpc_timeout); + } } catch (const folly::AsyncSocketException &e) { - p.setException(folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}}); + req.promise()->setException( + folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}}); /* clear folly::Promise to avoid overflow. */ requests_.erase(call_id); } diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 1f8e6b3b7b..eaedca14ca 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -27,6 +27,7 @@ #include #include +#include "async-request-timeout.h" #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" @@ -42,7 +43,7 @@ class ClientDispatcher std::unique_ptr> { public: /** Create a new ClientDispatcher */ - ClientDispatcher(); + ClientDispatcher(std::shared_ptr conf); /** Read a response off the pipeline. */ void read(Context *ctx, std::unique_ptr in) override; /** Take a request as a call and send it down the pipeline. */ @@ -53,7 +54,7 @@ class ClientDispatcher folly::Future close() override; private: - concurrent_map>> requests_; + concurrent_map requests_; // Start at some number way above what could // be there for un-initialized call id counters. // @@ -63,5 +64,6 @@ class ClientDispatcher // uint32_t has a max of 4Billion so 10 more or less is // not a big deal. std::atomic current_call_id_; + std::shared_ptr conf_; }; } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index e763c038bd..7737c14c38 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -69,7 +69,7 @@ std::shared_ptr ConnectionFactory::Connect( ->connect(folly::SocketAddress(hostname, port, true), std::chrono::duration_cast(connect_timeout_)) .get(); - auto dispatcher = std::make_shared(); + auto dispatcher = std::make_shared(conf_); dispatcher->setPipeline(pipeline); return dispatcher; } catch (const folly::AsyncSocketException &e) { diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc index 8e405ef879..a18a292b80 100644 --- a/hbase-native-client/connection/rpc-test-server-handler.cc +++ b/hbase-native-client/connection/rpc-test-server-handler.cc @@ -75,6 +75,9 @@ std::unique_ptr RpcTestServerSerializeHandler::CreateReceivedRequest( } else if (method_name == "socketNotOpen") { result = std::make_unique(std::make_shared(), std::make_shared(), method_name); + } else if (method_name == "timeout") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); } return result; } diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc index f350d6a5b8..bd5bf30cf9 100644 --- a/hbase-native-client/connection/rpc-test-server.cc +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -68,6 +68,7 @@ Future> RpcTestService::operator()(std::unique_ptr(); response->set_resp_msg(pb_resp_msg); + } else if (method_name == "echo") { auto pb_resp_msg = std::make_shared(); /* get msg from client */ @@ -89,8 +90,18 @@ Future> RpcTestService::operator()(std::unique_ptr(); + response->set_resp_msg(pb_resp_msg); + + } else if (method_name == "timeout") { + VLOG(1) << "RPC server:" + << " timeout called."; auto pb_resp_msg = std::make_shared(); response->set_resp_msg(pb_resp_msg); + /* sleeping one hour */ + std::this_thread::sleep_for(std::chrono::seconds(10)); } return folly::makeFuture>(std::move(response)); diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc index e7f678dd14..93fe72ee29 100644 --- a/hbase-native-client/connection/rpc-test.cc +++ b/hbase-native-client/connection/rpc-test.cc @@ -203,3 +203,31 @@ TEST_F(RpcTest, SocketNotOpen) { })); }); } + +/** +* test timeout +*/ +TEST_F(RpcTest, Timeout) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "timeout"; + auto request = std::make_unique(std::make_shared(), + std::make_shared(), method); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr response) { + FAIL() << folly::sformat(FLAGS_fail_format, method); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + }); + + server->stop(); + server->join(); +} diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index bc3b2913d9..4865a22a24 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -127,6 +127,18 @@ class ConnectionException : public IOException { : IOException(what, cause) {} }; +class TimeoutException : public IOException { + public: + TimeoutException() {} + + TimeoutException(const std::string& what) : IOException(what) {} + + TimeoutException(const folly::exception_wrapper& cause) : IOException("", cause) {} + + TimeoutException(const std::string& what, const folly::exception_wrapper& cause) + : IOException(what, cause) {} +}; + class RemoteException : public IOException { public: RemoteException() : IOException(), port_(0) {} diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto index 2730403b8e..3d9ee81436 100644 --- a/hbase-native-client/if/test_rpc_service.proto +++ b/hbase-native-client/if/test_rpc_service.proto @@ -33,4 +33,5 @@ service TestProtobufRpcProto { rpc pause(PauseRequestProto) returns (EmptyResponseProto); rpc addr(EmptyRequestProto) returns (AddrResponseProto); rpc socketNotOpen(EmptyRequestProto) returns (EmptyResponseProto); + rpc timeout(EmptyRequestProto) returns (EmptyResponseProto); } -- 2.11.0 (Apple Git-81)