From e835e469068cc565c8bc6922ae595147664f816a Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Wed, 9 Aug 2017 10:16:24 -0700 Subject: [PATCH] HBASE-18534. [C++] Support timeout in Rpc --- hbase-native-client/connection/BUCK | 2 + .../connection/async-request-timeout.cc | 73 ++++++++++++++++++++++ .../connection/async-request-timeout.h | 63 +++++++++++++++++++ .../connection/client-dispatcher.cc | 27 +++++--- hbase-native-client/connection/client-dispatcher.h | 6 +- .../connection/connection-factory.cc | 2 +- .../connection/rpc-test-server-handler.cc | 3 + hbase-native-client/connection/rpc-test-server.cc | 11 ++++ hbase-native-client/connection/rpc-test.cc | 28 +++++++++ hbase-native-client/core/BUCK | 1 + .../core/connection-configuration.h | 11 ++-- hbase-native-client/exceptions/exception.h | 12 ++++ hbase-native-client/if/test_rpc_service.proto | 1 + 13 files changed, 224 insertions(+), 16 deletions(-) create mode 100644 hbase-native-client/connection/async-request-timeout.cc create mode 100644 hbase-native-client/connection/async-request-timeout.h diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index a87d27ae1c..a950a189df 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -37,6 +37,7 @@ cxx_library( "rpc-test-server-handler.h", "rpc-fault-injector.h", "rpc-fault-injector-inl.h", + "async-request-timeout.h", ], srcs=[ "client-dispatcher.cc", @@ -51,6 +52,7 @@ cxx_library( "rpc-test-server.cc", "rpc-test-server-handler.cc", "rpc-fault-injector.cc", + "async-request-timeout.cc", ], deps=[ "//if:if", diff --git a/hbase-native-client/connection/async-request-timeout.cc b/hbase-native-client/connection/async-request-timeout.cc new file mode 100644 index 0000000000..4cde4139ff --- /dev/null +++ b/hbase-native-client/connection/async-request-timeout.cc @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "async-request-timeout.h" +#include +#include "exceptions/exception.h" + +namespace hbase { + +AsyncRequestTimeout::AsyncRequestTimeout( + uint32_t call_id, std::chrono::milliseconds timeout_ms, + std::shared_ptr>> req_promise) + : AsyncTimeout(&event_base_), + call_id_(call_id), + timeout_ms_(timeout_ms), + req_promise_(req_promise) {} + +AsyncRequestTimeout::~AsyncRequestTimeout() {} + +void AsyncRequestTimeout::set_request_sent(bool request_sent) { request_sent_ = request_sent; } + +/** + * TODO: make it thread safe. This function has race condition of write accessing the promise (i.e. + * req_promise_). + */ +void AsyncRequestTimeout::timeoutExpired() noexcept { + /* fulfilled by ClientDispatcher::read */ + if (req_promise_.get() == nullptr) { + return; + } + + if (request_sent_) { + /* request is sent to server, timeout before getting any result. */ + if (req_promise_.get() != nullptr && !req_promise_->isFulfilled()) { + req_promise_->setException(folly::exception_wrapper{ + TimeoutException{folly::sformat("request {} timed out", call_id_)}}); + } + } else { + /** + * request is not sent to server, cancel current timeout and schedule another one to wait for + * result.*/ + this->cancelTimeout(); + if (timeout_ms_.count() > 0) { + this->scheduleTimeout(timeout_ms_); + } + } +} + +TimeoutRequest::TimeoutRequest(uint32_t call_id, std::chrono::milliseconds timeout_ms) { + Init(call_id, timeout_ms); +} + +void TimeoutRequest::Init(uint32_t call_id, std::chrono::milliseconds timeout_ms) { + promise_ = std::make_shared>>(); + timeout_ = std::make_shared(call_id, timeout_ms, promise_); +} + +} /* namespace hbase */ diff --git a/hbase-native-client/connection/async-request-timeout.h b/hbase-native-client/connection/async-request-timeout.h new file mode 100644 index 0000000000..8bd621ddd0 --- /dev/null +++ b/hbase-native-client/connection/async-request-timeout.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include + +#include +#include +#include +#include "connection/request.h" +#include "response.h" +#include "utils/concurrent-map.h" + +using namespace folly; + +namespace hbase { +class AsyncRequestTimeout : public folly::AsyncTimeout { + public: + AsyncRequestTimeout(uint32_t call_id, std::chrono::milliseconds timeout_ms, + std::shared_ptr>> req_promise); + virtual ~AsyncRequestTimeout(); + void timeoutExpired() noexcept override; + void set_request_sent(bool request_sent); + + private: + EventBase event_base_{}; + uint32_t call_id_; + std::chrono::milliseconds timeout_ms_; + std::atomic_bool request_sent_{false}; + std::shared_ptr>> req_promise_; +}; + +class TimeoutRequest { + public: + TimeoutRequest() {} + TimeoutRequest(uint32_t call_id, std::chrono::milliseconds timeout_ms); + virtual ~TimeoutRequest() = default; + void Init(uint32_t call_id, std::chrono::milliseconds timeout_ms); + std::shared_ptr>> promise() { return promise_; } + std::shared_ptr timeout() { return timeout_; } + + private: + std::shared_ptr timeout_; + std::shared_ptr>> promise_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index d5d7f5fe54..1a0368e75a 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -21,46 +21,57 @@ #include #include #include +#include "core/connection-configuration.h" #include "exceptions/exception.h" using std::unique_ptr; namespace hbase { -ClientDispatcher::ClientDispatcher() : current_call_id_(9), requests_(5000) {} +ClientDispatcher::ClientDispatcher(std::shared_ptr conf) + : conf_(conf), current_call_id_(9), requests_(5000) {} void ClientDispatcher::read(Context *ctx, unique_ptr in) { auto call_id = in->call_id(); - auto p = requests_.find_and_erase(call_id); + auto req = requests_.find_and_erase(call_id); VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what: {}", in->call_id(), bool(in->exception()), in->exception().what()); if (in->exception()) { - p.setException(in->exception()); + req.promise()->setException(in->exception()); } else { - p.setValue(std::move(in)); + req.promise()->setValue(std::move(in)); } } folly::Future> ClientDispatcher::operator()(unique_ptr arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); + int read_rpc_timeout_ms = conf_->GetInt(ConnectionConfiguration::kRpcReadTimeout, + ConnectionConfiguration::kDefaultRpcTimeout); // TODO: if the map is full (or we have more than hbase.client.perserver.requests.threshold) // then throw ServerTooBusyException so that upper layers will retry. - auto &p = requests_[call_id]; + auto &req = requests_[call_id]; + req.Init(call_id, std::chrono::milliseconds(read_rpc_timeout_ms)); - auto f = p.getFuture(); - p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { + auto f = req.promise()->getFuture(); + req.promise()->setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { LOG(ERROR) << "e = " << call_id; this->requests_.erase(call_id); }); try { this->pipeline_->write(std::move(arg)); + /* set timeout status and schedule timeout of read rpc. */ + req.timeout()->set_request_sent(true); + if (read_rpc_timeout_ms > 0) { + req.timeout()->scheduleTimeout(std::chrono::milliseconds(read_rpc_timeout_ms)); + } } catch (const folly::AsyncSocketException &e) { - p.setException(folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}}); + req.promise()->setException( + folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}}); /* clear folly::Promise to avoid overflow. */ requests_.erase(call_id); } diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 1f8e6b3b7b..eaedca14ca 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -27,6 +27,7 @@ #include #include +#include "async-request-timeout.h" #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" @@ -42,7 +43,7 @@ class ClientDispatcher std::unique_ptr> { public: /** Create a new ClientDispatcher */ - ClientDispatcher(); + ClientDispatcher(std::shared_ptr conf); /** Read a response off the pipeline. */ void read(Context *ctx, std::unique_ptr in) override; /** Take a request as a call and send it down the pipeline. */ @@ -53,7 +54,7 @@ class ClientDispatcher folly::Future close() override; private: - concurrent_map>> requests_; + concurrent_map requests_; // Start at some number way above what could // be there for un-initialized call id counters. // @@ -63,5 +64,6 @@ class ClientDispatcher // uint32_t has a max of 4Billion so 10 more or less is // not a big deal. std::atomic current_call_id_; + std::shared_ptr conf_; }; } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index e763c038bd..7737c14c38 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -69,7 +69,7 @@ std::shared_ptr ConnectionFactory::Connect( ->connect(folly::SocketAddress(hostname, port, true), std::chrono::duration_cast(connect_timeout_)) .get(); - auto dispatcher = std::make_shared(); + auto dispatcher = std::make_shared(conf_); dispatcher->setPipeline(pipeline); return dispatcher; } catch (const folly::AsyncSocketException &e) { diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc index 8e405ef879..a18a292b80 100644 --- a/hbase-native-client/connection/rpc-test-server-handler.cc +++ b/hbase-native-client/connection/rpc-test-server-handler.cc @@ -75,6 +75,9 @@ std::unique_ptr RpcTestServerSerializeHandler::CreateReceivedRequest( } else if (method_name == "socketNotOpen") { result = std::make_unique(std::make_shared(), std::make_shared(), method_name); + } else if (method_name == "timeout") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); } return result; } diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc index f350d6a5b8..bd5bf30cf9 100644 --- a/hbase-native-client/connection/rpc-test-server.cc +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -68,6 +68,7 @@ Future> RpcTestService::operator()(std::unique_ptr(); response->set_resp_msg(pb_resp_msg); + } else if (method_name == "echo") { auto pb_resp_msg = std::make_shared(); /* get msg from client */ @@ -89,8 +90,18 @@ Future> RpcTestService::operator()(std::unique_ptr(); + response->set_resp_msg(pb_resp_msg); + + } else if (method_name == "timeout") { + VLOG(1) << "RPC server:" + << " timeout called."; auto pb_resp_msg = std::make_shared(); response->set_resp_msg(pb_resp_msg); + /* sleeping one hour */ + std::this_thread::sleep_for(std::chrono::seconds(10)); } return folly::makeFuture>(std::move(response)); diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc index e7f678dd14..93fe72ee29 100644 --- a/hbase-native-client/connection/rpc-test.cc +++ b/hbase-native-client/connection/rpc-test.cc @@ -203,3 +203,31 @@ TEST_F(RpcTest, SocketNotOpen) { })); }); } + +/** +* test timeout +*/ +TEST_F(RpcTest, Timeout) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "timeout"; + auto request = std::make_unique(std::make_shared(), + std::make_shared(), method); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr response) { + FAIL() << folly::sformat(FLAGS_fail_format, method); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + }); + + server->stop(); + server->join(); +} diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 76c836bcd9..23d03e67c4 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -116,6 +116,7 @@ cxx_library( exported_headers=[ "configuration.h", "hbase-configuration-loader.h", + "connection-configuration.h", ], srcs=[ "configuration.cc", diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h index 995798ece7..9b37ba389f 100644 --- a/hbase-native-client/core/connection-configuration.h +++ b/hbase-native-client/core/connection-configuration.h @@ -101,6 +101,12 @@ class ConnectionConfiguration { uint64_t scanner_max_result_size() const { return scanner_max_result_size_; } + public: + /** timeout for each read RPC */ + static constexpr const char* kRpcReadTimeout = "hbase.rpc.read.timeout"; + + static constexpr const uint32_t kDefaultRpcTimeout = 60000; + private: /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ static constexpr const char* kClientSocketConnectTimeout = @@ -120,14 +126,9 @@ class ConnectionConfiguration { /** timeout for each RPC */ static constexpr const char* kRpcTimeout = "hbase.rpc.timeout"; - /** timeout for each read RPC */ - static constexpr const char* kRpcReadTimeout = "hbase.rpc.read.timeout"; - /** timeout for each write RPC */ static constexpr const char* kRpcWriteTimeout = "hbase.rpc.write.timeout"; - static constexpr const uint32_t kDefaultRpcTimeout = 60000; - /** * Parameter name for client pause value, used mostly as value to wait * before running a retry of a failed get, region lookup, etc. diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index bc3b2913d9..4865a22a24 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -127,6 +127,18 @@ class ConnectionException : public IOException { : IOException(what, cause) {} }; +class TimeoutException : public IOException { + public: + TimeoutException() {} + + TimeoutException(const std::string& what) : IOException(what) {} + + TimeoutException(const folly::exception_wrapper& cause) : IOException("", cause) {} + + TimeoutException(const std::string& what, const folly::exception_wrapper& cause) + : IOException(what, cause) {} +}; + class RemoteException : public IOException { public: RemoteException() : IOException(), port_(0) {} diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto index 2730403b8e..3d9ee81436 100644 --- a/hbase-native-client/if/test_rpc_service.proto +++ b/hbase-native-client/if/test_rpc_service.proto @@ -33,4 +33,5 @@ service TestProtobufRpcProto { rpc pause(PauseRequestProto) returns (EmptyResponseProto); rpc addr(EmptyRequestProto) returns (AddrResponseProto); rpc socketNotOpen(EmptyRequestProto) returns (EmptyResponseProto); + rpc timeout(EmptyRequestProto) returns (EmptyResponseProto); } -- 2.11.0 (Apple Git-81)