diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index 027e880..3e79821 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -21,6 +21,8 @@ cxx_library( exported_headers=[ "async-connection.h", "async-region-locator.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", "client.h", "cell.h", "hbase-macros.h", @@ -42,16 +44,17 @@ cxx_library( "response-converter.h", "table.h", "raw-async-table.h", - "async-rpc-retrying-caller-factory.h", - "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", "time-range.h", "zk-util.h", ], srcs=[ "async-connection.cc", + "async-rpc-retrying-caller-factory.cc", + "async-rpc-retrying-caller.cc", "cell.cc", "client.cc", + "hbase-rpc-controller.cc", "keyvalue-codec.cc", "location-cache.cc", "meta-utils.cc", diff --git hbase-native-client/core/async-connection.h hbase-native-client/core/async-connection.h index 6a61124..ff11577 100644 --- hbase-native-client/core/async-connection.h +++ hbase-native-client/core/async-connection.h @@ -26,12 +26,12 @@ #include #include +#include #include "connection/rpc-client.h" #include "core/async-region-locator.h" #include "core/configuration.h" #include "core/connection-configuration.h" -#include "core/connection-configuration.h" #include "core/hbase-configuration-loader.h" #include "core/hbase-rpc-controller.h" #include "core/keyvalue-codec.h" @@ -45,8 +45,8 @@ class AsyncRpcRetryingCallerFactory; class AsyncConnection { public: - AsyncConnection(){}; - virtual ~AsyncConnection(){}; + AsyncConnection() {} + virtual ~AsyncConnection() {} virtual std::shared_ptr conf() = 0; virtual std::shared_ptr connection_conf() = 0; virtual std::shared_ptr caller_factory() = 0; @@ -82,7 +82,7 @@ class AsyncConnectionImpl : public AsyncConnection, return std::make_shared(); } - virtual void Close() override; + void Close() override; protected: AsyncConnectionImpl() {} @@ -105,7 +105,7 @@ class AsyncConnectionImpl : public AsyncConnection, bool is_closed_ = false; private: - AsyncConnectionImpl(std::shared_ptr conf) : conf_(conf) {} + explicit AsyncConnectionImpl(std::shared_ptr conf) : conf_(conf) {} void Init(); }; } // namespace hbase diff --git hbase-native-client/core/async-region-locator.h hbase-native-client/core/async-region-locator.h index b0019e0..c606dcb 100644 --- hbase-native-client/core/async-region-locator.h +++ hbase-native-client/core/async-region-locator.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include "core/region-location.h" diff --git hbase-native-client/core/async-rpc-retrying-caller.cc hbase-native-client/core/async-rpc-retrying-caller.cc index 743b6bb..965a44b 100644 --- hbase-native-client/core/async-rpc-retrying-caller.cc +++ hbase-native-client/core/async-rpc-retrying-caller.cc @@ -19,4 +19,199 @@ #include "core/async-rpc-retrying-caller.h" -namespace hbase {} /* namespace hbase */ +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-connection.h" +#include "core/hbase-rpc-controller.h" +#include "core/region-location.h" +#include "core/result.h" +#include "exceptions/exception.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +namespace hbase { + +template +AsyncSingleRequestRpcRetryingCaller::AsyncSingleRequestRpcRetryingCaller( + std::shared_ptr conn, std::shared_ptr table_name, + const std::string& row, RegionLocateType locate_type, Callable callable, + nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) + : conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_(pause), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + controller_ = conn_->CreateRpcController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared>(); + retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); +} + +template +AsyncSingleRequestRpcRetryingCaller::~AsyncSingleRequestRpcRetryingCaller() {} + +template +folly::Future AsyncSingleRequestRpcRetryingCaller::Call() { + auto f = promise_->getFuture(); + LocateThenCall(); + return f; +} + +template +void AsyncSingleRequestRpcRetryingCaller::LocateThenCall() { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->region_locator() + ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](std::shared_ptr loc) { Call(*loc); }) + .onError([this](const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + + table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [](const std::exception& error) {}); + }); +} + +template +void AsyncSingleRequestRpcRetryingCaller::OnError( + const std::exception& error, Supplier err_msg, + Consumer update_cached_location) { + ThrowableWithExtraContext twec(std::make_shared(error), TimeUtil::GetNowNanos()); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf(error) || tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + int64_t delay_ns; + if (operation_timeout_nanos_.count() > 0) { + int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); +} + +template +void AsyncSingleRequestRpcRetryingCaller::Call(const RegionLocation& loc) { + int64_t call_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); + } else { + call_timeout_ns = rpc_timeout_nanos_.count(); + } + + std::shared_ptr rpc_client; + try { + // TODO: There is no connection attempt happening here, no need to try-catch. + rpc_client = conn_->rpc_client(); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async rpc_client to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [&, this](const std::exception& error) { + conn_->region_locator()->UpdateCachedLocation(loc, error); + }); + return; + } + + ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared(loc), rpc_client) + .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) + .onError([&, this](const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [&, this](const std::exception& error) { + conn_->region_locator()->UpdateCachedLocation(loc, error); + }); + return; + }); +} + +template +void AsyncSingleRequestRpcRetryingCaller::CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); +} + +template +int64_t AsyncSingleRequestRpcRetryingCaller::RemainingTimeNs() { + return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); +} + +template +void AsyncSingleRequestRpcRetryingCaller::ResetController( + std::shared_ptr controller, const int64_t& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + milliseconds(std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); + } +} + +// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the +// templetized +// class definitions. +template class AsyncSingleRequestRpcRetryingCaller>; +template class AsyncSingleRequestRpcRetryingCaller; +} /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-caller.h hbase-native-client/core/async-rpc-retrying-caller.h index 6503301..6006388 100644 --- hbase-native-client/core/async-rpc-retrying-caller.h +++ hbase-native-client/core/async-rpc-retrying-caller.h @@ -18,11 +18,10 @@ */ #pragma once -#include -#include #include #include #include + #include #include #include @@ -31,15 +30,11 @@ #include #include #include -#include "connection/rpc-client.h" #include "core/async-connection.h" #include "core/hbase-rpc-controller.h" #include "core/region-location.h" #include "exceptions/exception.h" #include "if/HBase.pb.h" -#include "utils/connection-util.h" -#include "utils/sys-util.h" -#include "utils/time-util.h" using std::chrono::nanoseconds; using std::chrono::milliseconds; @@ -80,168 +75,26 @@ class AsyncSingleRequestRpcRetryingCaller { Callable callable, nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos, - uint32_t start_log_errors_count) - : conn_(conn), - table_name_(table_name), - row_(row), - locate_type_(locate_type), - callable_(callable), - pause_(pause), - max_retries_(max_retries), - operation_timeout_nanos_(operation_timeout_nanos), - rpc_timeout_nanos_(rpc_timeout_nanos), - start_log_errors_count_(start_log_errors_count), - promise_(std::make_shared>()), - tries_(1) { - controller_ = conn_->CreateRpcController(); - start_ns_ = TimeUtil::GetNowNanos(); - max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); - exceptions_ = std::make_shared>(); - retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); - } + uint32_t start_log_errors_count); - virtual ~AsyncSingleRequestRpcRetryingCaller() {} + virtual ~AsyncSingleRequestRpcRetryingCaller(); - folly::Future Call() { - auto f = promise_->getFuture(); - LocateThenCall(); - return f; - } + folly::Future Call(); private: - void LocateThenCall() { - int64_t locate_timeout_ns; - if (operation_timeout_nanos_.count() > 0) { - locate_timeout_ns = RemainingTimeNs(); - if (locate_timeout_ns <= 0) { - CompleteExceptionally(); - return; - } - } else { - locate_timeout_ns = -1L; - } - - conn_->region_locator() - ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) - .then([this](std::shared_ptr loc) { Call(*loc); }) - .onError([this](const std::exception& e) { - OnError(e, - [this]() -> std::string { - return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + - table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + - ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + - " ms"; - }, - [](const std::exception& error) {}); - }); - } + void LocateThenCall(); void OnError(const std::exception& error, Supplier err_msg, - Consumer update_cached_location) { - ThrowableWithExtraContext twec(std::make_shared(error), - TimeUtil::GetNowNanos()); - exceptions_->push_back(twec); - if (SysUtil::InstanceOf(error) || - tries_ >= max_retries_) { - CompleteExceptionally(); - return; - } - - int64_t delay_ns; - if (operation_timeout_nanos_.count() > 0) { - int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; - if (max_delay_ns <= 0) { - CompleteExceptionally(); - return; - } - delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); - } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); - } - update_cached_location(error); - tries_++; - retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, - milliseconds(TimeUtil::ToMillis(delay_ns))); - } - - void Call(const RegionLocation& loc) { - int64_t call_timeout_ns; - if (operation_timeout_nanos_.count() > 0) { - call_timeout_ns = this->RemainingTimeNs(); - if (call_timeout_ns <= 0) { - this->CompleteExceptionally(); - return; - } - call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); - } else { - call_timeout_ns = rpc_timeout_nanos_.count(); - } + Consumer update_cached_location); - std::shared_ptr rpc_client; - try { - // TODO: There is no connection attempt happening here, no need to try-catch. - rpc_client = conn_->rpc_client(); - } catch (const IOException& e) { - OnError(e, - [&, this]() -> std::string { - return "Get async rpc_client to " + - folly::sformat("{0}:{1}", loc.server_name().host_name(), - loc.server_name().port()) + - " for '" + row_ + "' in " + loc.DebugString() + " of " + - table_name_->namespace_() + "::" + table_name_->qualifier() + - " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + - std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; - }, - [&, this](const std::exception& error) { - conn_->region_locator()->UpdateCachedLocation(loc, error); - }); - return; - } + void Call(const RegionLocation& loc); - ResetController(controller_, call_timeout_ns); + void CompleteExceptionally(); - callable_(controller_, std::make_shared(loc), rpc_client) - .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) - .onError([&, this](const std::exception& e) { - OnError(e, - [&, this]() -> std::string { - return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), - loc.server_name().port()) + - " for '" + row_ + "' in " + loc.DebugString() + " of " + - table_name_->namespace_() + "::" + table_name_->qualifier() + - " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + - std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + - " ms"; - }, - [&, this](const std::exception& error) { - conn_->region_locator()->UpdateCachedLocation(loc, error); - }); - return; - }); - } - - void CompleteExceptionally() { - this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); - } - - int64_t RemainingTimeNs() { - return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); - } + int64_t RemainingTimeNs(); static void ResetController(std::shared_ptr controller, - const int64_t& timeout_ns) { - controller->Reset(); - if (timeout_ns >= 0) { - controller->set_call_timeout( - milliseconds(std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); - } - } + const int64_t& timeout_ns); private: folly::HHWheelTimer::UniquePtr retry_timer_; @@ -263,5 +116,4 @@ class AsyncSingleRequestRpcRetryingCaller { uint32_t max_attempts_; folly::EventBase event_base_; }; - } /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index 3ed6866..4956972 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -47,11 +47,23 @@ #include "test-util/test-util.h" #include "utils/time-util.h" -using namespace google::protobuf; -using namespace hbase; -using namespace hbase::pb; -using namespace std::placeholders; -using namespace testing; +using hbase::AsyncRpcRetryingCallerFactory; +using hbase::AsyncConnection; +using hbase::AsyncRegionLocator; +using hbase::ConnectionConfiguration; +using hbase::Configuration; +using hbase::HBaseRpcController; +using hbase::RegionLocation; +using hbase::RegionLocateType; +using hbase::RpcClient; +using hbase::RequestConverter; +using hbase::ResponseConverter; +using hbase::ReqConverter; +using hbase::RespConverter; +using hbase::Put; +using hbase::TimeUtil; +using hbase::Client; + using ::testing::Return; using ::testing::_; using std::chrono::nanoseconds; @@ -62,10 +74,10 @@ class MockAsyncRegionLocator : public AsyncRegionLocator { : region_location_(region_location) {} ~MockAsyncRegionLocator() = default; - folly::Future> LocateRegion(const hbase::pb::TableName&, - const std::string&, - const RegionLocateType, - const int64_t) override { + folly::Future> LocateRegion(const hbase::pb::TableName&, + const std::string&, + const RegionLocateType, + const int64_t) override { folly::Promise> promise; promise.setValue(region_location_); return promise.getFuture(); diff --git hbase-native-client/core/get-test.cc hbase-native-client/core/get-test.cc index 5a6db3c..2b51003 100644 --- hbase-native-client/core/get-test.cc +++ hbase-native-client/core/get-test.cc @@ -17,8 +17,8 @@ * */ -#include "core/cell.h" #include "core/get.h" +#include "core/cell.h" #include #include