diff --git hbase-native-client/connection/BUCK hbase-native-client/connection/BUCK index 19536d5..36111f8 100644 --- hbase-native-client/connection/BUCK +++ hbase-native-client/connection/BUCK @@ -48,6 +48,7 @@ cxx_library( "//security:security", "//third-party:folly", "//third-party:wangle", + "//exceptions:exceptions", ], compiler_flags=['-Weffc++'], visibility=['//core/...',],) diff --git hbase-native-client/connection/client-dispatcher.cc hbase-native-client/connection/client-dispatcher.cc index 626fc76..27201d2 100644 --- hbase-native-client/connection/client-dispatcher.cc +++ hbase-native-client/connection/client-dispatcher.cc @@ -17,12 +17,14 @@ * */ #include "connection/client-dispatcher.h" +#include #include using namespace folly; using namespace hbase; using namespace wangle; +using folly::exception_wrapper; ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} @@ -35,9 +37,11 @@ void ClientDispatcher::read(Context *ctx, std::unique_ptr in) { requests_.erase(call_id); - // TODO(eclark): check if the response - // is an exception. If it is then set that. - p.setValue(std::move(in)); + if (in->exception()) { + p.setException(in->exception()); + } else { + p.setValue(std::move(in)); + } } Future> ClientDispatcher::operator()(std::unique_ptr arg) { diff --git hbase-native-client/connection/client-handler.cc hbase-native-client/connection/client-handler.cc index af84572..113ebd0 100644 --- hbase-native-client/connection/client-handler.cc +++ hbase-native-client/connection/client-handler.cc @@ -19,6 +19,7 @@ #include "connection/client-handler.h" +#include #include #include @@ -36,9 +37,11 @@ using hbase::pb::ResponseHeader; using hbase::pb::GetResponse; using google::protobuf::Message; -ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec) +ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec, + const std::string &server) : user_name_(user_name), serde_(codec), + server_(server), once_flag_(std::make_unique()), resp_msgs_( make_unique>>( @@ -51,7 +54,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { ResponseHeader header; int used_bytes = serde_.ParseDelimited(buf.get(), &header); - VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() + VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() << " has_exception=" << header.has_exception(); // Get the response protobuf from the map @@ -92,9 +95,31 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { } received->set_resp_msg(resp_msg); - } - // TODO: set exception in Response here + } else { + hbase::pb::ExceptionResponse exceptionResponse = header.exception(); + + std::string what; + std::string exception_class_name = exceptionResponse.has_exception_class_name() + ? exceptionResponse.exception_class_name() + : ""; + std::string stack_trace = + exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : ""; + what.append(exception_class_name).append(stack_trace); + + auto remote_exception = std::make_unique(what); + remote_exception->set_exception_class_name(exception_class_name) + ->set_stack_trace(stack_trace) + ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "") + ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0); + if (exceptionResponse.has_do_not_retry()) { + remote_exception->set_do_not_retry(exceptionResponse.do_not_retry()); + } + VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id() + << " exception.what=" << remote_exception->what() + << ", do_not_retry=" << remote_exception->do_not_retry(); + received->set_exception(::folly::exception_wrapper{*remote_exception}); + } ctx->fireRead(std::move(received)); } } @@ -103,17 +128,19 @@ Future ClientHandler::write(Context *ctx, std::unique_ptr r) { // We need to send the header once. // So use call_once to make sure that only one thread wins this. std::call_once((*once_flag_), [ctx, this]() { + VLOG(3) << "Writing RPC connection Preamble and Header to server: " << server_; auto pre = serde_.Preamble(); auto header = serde_.Header(user_name_); pre->appendChain(std::move(header)); ctx->fireWrite(std::move(pre)); }); + VLOG(3) << "Writing RPC Request with call_id:" + << r->call_id(); // TODO: more logging for RPC Header + // Now store the call id to response. resp_msgs_->insert(r->call_id(), r->resp_msg()); - VLOG(1) << "Writing RPC Request with call_id:" << r->call_id(); - // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } diff --git hbase-native-client/connection/client-handler.h hbase-native-client/connection/client-handler.h index afb8e62..4c106e0 100644 --- hbase-native-client/connection/client-handler.h +++ hbase-native-client/connection/client-handler.h @@ -27,6 +27,7 @@ #include #include +#include "exceptions/exception.h" #include "serde/codec.h" #include "serde/rpc.h" @@ -59,7 +60,7 @@ class ClientHandler * Create the handler * @param user_name the user name of the user running this process. */ - explicit ClientHandler(std::string user_name, std::shared_ptr codec); + ClientHandler(std::string user_name, std::shared_ptr codec, const std::string &server); /** * Get bytes from the wire. @@ -77,6 +78,7 @@ class ClientHandler std::unique_ptr once_flag_; std::string user_name_; RpcSerde serde_; + std::string server_; // for logging // in flight requests std::unique_ptr>> diff --git hbase-native-client/connection/connection-factory.cc hbase-native-client/connection/connection-factory.cc index 832b00f..afa227d 100644 --- hbase-native-client/connection/connection-factory.cc +++ hbase-native-client/connection/connection-factory.cc @@ -46,9 +46,10 @@ std::shared_ptr> ConnectionFactory::M return client; } + std::shared_ptr ConnectionFactory::Connect( std::shared_ptr> client, const std::string &hostname, - int port) { + uint16_t port) { // Yes this will block however it makes dealing with connection pool soooooo // much nicer. // TODO see about using shared promise for this. diff --git hbase-native-client/connection/connection-factory.h hbase-native-client/connection/connection-factory.h index 32d0bf7..1e75571 100644 --- hbase-native-client/connection/connection-factory.h +++ hbase-native-client/connection/connection-factory.h @@ -61,7 +61,7 @@ class ConnectionFactory { */ virtual std::shared_ptr Connect( std::shared_ptr> client, - const std::string &hostname, int port); + const std::string &hostname, uint16_t port); private: nanoseconds connect_timeout_; diff --git hbase-native-client/connection/connection-pool-test.cc hbase-native-client/connection/connection-pool-test.cc index 623ce3c..8ecdf29 100644 --- hbase-native-client/connection/connection-pool-test.cc +++ hbase-native-client/connection/connection-pool-test.cc @@ -40,7 +40,7 @@ class MockConnectionFactory : public ConnectionFactory { MOCK_METHOD0(MakeBootstrap, std::shared_ptr>()); MOCK_METHOD3(Connect, std::shared_ptr( std::shared_ptr>, - const std::string &hostname, int port)); + const std::string &hostname, uint16_t port)); }; class MockBootstrap : public wangle::ClientBootstrap {}; diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index 4fe4610..3121294 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -19,10 +19,11 @@ #include "connection/connection-pool.h" +#include +#include #include #include -#include #include #include @@ -89,7 +90,6 @@ std::shared_ptr ConnectionPool::GetNewConnection( /* create new connection */ auto clientBootstrap = cf_->MakeBootstrap(); auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); - auto connection = std::make_shared(remote_id, dispatcher); connections_.insert(std::make_pair(remote_id, connection)); @@ -101,6 +101,8 @@ std::shared_ptr ConnectionPool::GetNewConnection( void ConnectionPool::Close(std::shared_ptr remote_id) { SharedMutexWritePriority::WriteHolder holder{map_mutex_}; + DLOG(INFO) << "Closing RPC Connection to host:" << remote_id->host() + << ", port:" << folly::to(remote_id->port()); auto found = connections_.find(remote_id); if (found == connections_.end() || found->second == nullptr) { @@ -108,6 +110,7 @@ void ConnectionPool::Close(std::shared_ptr remote_id) { } found->second->Close(); connections_.erase(found); + // TODO: erase the client as well? } void ConnectionPool::Close() { diff --git hbase-native-client/connection/pipeline.cc hbase-native-client/connection/pipeline.cc index 00dc05c..edada52 100644 --- hbase-native-client/connection/pipeline.cc +++ hbase-native-client/connection/pipeline.cc @@ -35,11 +35,14 @@ RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr codec) SerializePipeline::Ptr RpcPipelineFactory::newPipeline( std::shared_ptr sock) { + SocketAddress addr; // for logging + sock->getPeerAddress(&addr); + auto pipeline = SerializePipeline::create(); pipeline->addBack(AsyncSocketHandler{sock}); pipeline->addBack(EventBaseHandler{}); pipeline->addBack(LengthFieldBasedFrameDecoder{}); - pipeline->addBack(ClientHandler{user_util_.user_name(), codec_}); + pipeline->addBack(ClientHandler{user_util_.user_name(), codec_, addr.describe()}); pipeline->finalize(); return pipeline; } diff --git hbase-native-client/connection/response.h hbase-native-client/connection/response.h index 1d60fed..c5472b0 100644 --- hbase-native-client/connection/response.h +++ hbase-native-client/connection/response.h @@ -22,6 +22,8 @@ #include #include +#include + #include "serde/cell-scanner.h" // Forward @@ -44,7 +46,7 @@ class Response { * Constructor. * Initinalizes the call id to 0. 0 should never be a valid call id. */ - Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {} + Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_(nullptr) {} /** Get the call_id */ uint32_t call_id() { return call_id_; } @@ -70,9 +72,14 @@ class Response { const std::unique_ptr& cell_scanner() const { return cell_scanner_; } + folly::exception_wrapper exception() { return exception_; } + + void set_exception(folly::exception_wrapper value) { exception_ = value; } + private: uint32_t call_id_; std::shared_ptr resp_msg_; std::unique_ptr cell_scanner_; + folly::exception_wrapper exception_; }; } // namespace hbase diff --git hbase-native-client/core/async-connection.cc hbase-native-client/core/async-connection.cc index b945e38..4642c61 100644 --- hbase-native-client/core/async-connection.cc +++ hbase-native-client/core/async-connection.cc @@ -29,6 +29,13 @@ void AsyncConnectionImpl::Init() { auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); cpu_executor_ = std::make_shared(cpu_threads); io_executor_ = std::make_shared(io_threads); + /* + * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly. + * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments + * in async-rpc-retrying-caller.cc. + */ + retry_executor_ = std::make_shared(1); + retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); std::shared_ptr codec = nullptr; if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == @@ -41,22 +48,21 @@ void AsyncConnectionImpl::Init() { std::make_shared(io_executor_, codec, connection_conf_->connect_timeout()); location_cache_ = std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); - caller_factory_ = std::make_shared(shared_from_this()); + caller_factory_ = + std::make_shared(shared_from_this(), retry_timer_); } // We can't have the threads continue running after everything is done // that leads to an error. -AsyncConnectionImpl::~AsyncConnectionImpl() { - cpu_executor_->stop(); - io_executor_->stop(); - if (rpc_client_.get()) rpc_client_->Close(); -} +AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); } void AsyncConnectionImpl::Close() { if (is_closed_) return; cpu_executor_->stop(); io_executor_->stop(); + retry_executor_->stop(); + retry_timer_->destroy(); if (rpc_client_.get()) rpc_client_->Close(); is_closed_ = true; } diff --git hbase-native-client/core/async-connection.h hbase-native-client/core/async-connection.h index ff11577..7b260a5 100644 --- hbase-native-client/core/async-connection.h +++ hbase-native-client/core/async-connection.h @@ -53,6 +53,9 @@ class AsyncConnection { virtual std::shared_ptr rpc_client() = 0; virtual std::shared_ptr region_locator() = 0; virtual std::shared_ptr CreateRpcController() = 0; + virtual std::shared_ptr cpu_executor() = 0; + virtual std::shared_ptr io_executor() = 0; + virtual std::shared_ptr retry_executor() = 0; virtual void Close() = 0; }; @@ -81,6 +84,11 @@ class AsyncConnectionImpl : public AsyncConnection, std::shared_ptr CreateRpcController() override { return std::make_shared(); } + std::shared_ptr cpu_executor() override { return cpu_executor_; } + std::shared_ptr io_executor() override { return io_executor_; } + std::shared_ptr retry_executor() override { + return retry_executor_; + } void Close() override; @@ -98,8 +106,10 @@ class AsyncConnectionImpl : public AsyncConnection, std::shared_ptr conf_; std::shared_ptr connection_conf_; std::shared_ptr caller_factory_; + std::shared_ptr retry_timer_; std::shared_ptr cpu_executor_; std::shared_ptr io_executor_; + std::shared_ptr retry_executor_; std::shared_ptr location_cache_; std::shared_ptr rpc_client_; bool is_closed_ = false; diff --git hbase-native-client/core/async-region-locator.h hbase-native-client/core/async-region-locator.h index c606dcb..f75cb7e 100644 --- hbase-native-client/core/async-region-locator.h +++ hbase-native-client/core/async-region-locator.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -57,7 +58,8 @@ class AsyncRegionLocator { /** * Update cached region location, possibly using the information from exception. */ - virtual void UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) = 0; + virtual void UpdateCachedLocation(const RegionLocation &loc, + const folly::exception_wrapper &error) = 0; }; } // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h index 5bcad6c..5a80a06 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -41,8 +41,10 @@ template class SingleRequestCallerBuilder : public std::enable_shared_from_this> { public: - explicit SingleRequestCallerBuilder(std::shared_ptr conn) + explicit SingleRequestCallerBuilder(std::shared_ptr conn, + std::shared_ptr retry_timer) : conn_(conn), + retry_timer_(retry_timer), table_name_(nullptr), rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), pause_(conn->connection_conf()->pause()), @@ -105,7 +107,7 @@ class SingleRequestCallerBuilder std::shared_ptr> Build() { return std::make_shared>( - conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, + conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); } @@ -116,6 +118,7 @@ class SingleRequestCallerBuilder private: std::shared_ptr conn_; + std::shared_ptr retry_timer_; std::shared_ptr table_name_; nanoseconds rpc_timeout_nanos_; nanoseconds operation_timeout_nanos_; @@ -130,15 +133,18 @@ class SingleRequestCallerBuilder class AsyncRpcRetryingCallerFactory { private: std::shared_ptr conn_; + std::shared_ptr retry_timer_; public: - explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} virtual ~AsyncRpcRetryingCallerFactory() = default; template std::shared_ptr> Single() { - return std::make_shared>(conn_); + return std::make_shared>(conn_, retry_timer_); } }; diff --git hbase-native-client/core/async-rpc-retrying-caller.cc hbase-native-client/core/async-rpc-retrying-caller.cc index 965a44b..7e211f7 100644 --- hbase-native-client/core/async-rpc-retrying-caller.cc +++ hbase-native-client/core/async-rpc-retrying-caller.cc @@ -19,6 +19,8 @@ #include "core/async-rpc-retrying-caller.h" +#include +#include #include #include #include @@ -34,15 +36,19 @@ #include "utils/sys-util.h" #include "utils/time-util.h" +using folly::exception_wrapper; + namespace hbase { template AsyncSingleRequestRpcRetryingCaller::AsyncSingleRequestRpcRetryingCaller( - std::shared_ptr conn, std::shared_ptr table_name, - const std::string& row, RegionLocateType locate_type, Callable callable, - nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, - nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) + std::shared_ptr conn, std::shared_ptr retry_timer, + std::shared_ptr table_name, const std::string& row, + RegionLocateType locate_type, Callable callable, nanoseconds pause, uint32_t max_retries, + nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos, + uint32_t start_log_errors_count) : conn_(conn), + retry_timer_(retry_timer), table_name_(table_name), row_(row), locate_type_(locate_type), @@ -58,7 +64,6 @@ AsyncSingleRequestRpcRetryingCaller::AsyncSingleRequestRpcRetryingCaller( start_ns_ = TimeUtil::GetNowNanos(); max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); exceptions_ = std::make_shared>(); - retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); } template @@ -87,7 +92,7 @@ void AsyncSingleRequestRpcRetryingCaller::LocateThenCall() { conn_->region_locator() ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) .then([this](std::shared_ptr loc) { Call(*loc); }) - .onError([this](const std::exception& e) { + .onError([this](const exception_wrapper& e) { OnError(e, [this]() -> std::string { return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + @@ -96,17 +101,17 @@ void AsyncSingleRequestRpcRetryingCaller::LocateThenCall() { TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; }, - [](const std::exception& error) {}); + [](const exception_wrapper& error) {}); }); } template void AsyncSingleRequestRpcRetryingCaller::OnError( - const std::exception& error, Supplier err_msg, - Consumer update_cached_location) { - ThrowableWithExtraContext twec(std::make_shared(error), TimeUtil::GetNowNanos()); + const exception_wrapper& error, Supplier err_msg, + Consumer update_cached_location) { + ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos()); exceptions_->push_back(twec); - if (SysUtil::InstanceOf(error) || tries_ >= max_retries_) { + if (!ShouldRetry(error) || tries_ >= max_retries_) { CompleteExceptionally(); return; } @@ -124,8 +129,33 @@ void AsyncSingleRequestRpcRetryingCaller::OnError( } update_cached_location(error); tries_++; - retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, - milliseconds(TimeUtil::ToMillis(delay_ns))); + + /* + * The HHWheelTimer::scheduleTimeout() fails with an assertion from + * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of + * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry + * timer from IOThreadPool threads. It only works when executed from a single-thread pool + * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should + * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool. + * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection + * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread + * just hangs because it deadlocks itself. + */ + conn_->retry_executor()->add([&]() { + retry_timer_->scheduleTimeoutFn( + [this]() { + conn_->cpu_executor()->add([&]() { LocateThenCall(); }); + }, + milliseconds(TimeUtil::ToMillis(delay_ns))); + }); +} + +template +bool AsyncSingleRequestRpcRetryingCaller::ShouldRetry(const exception_wrapper& error) { + bool do_not_retry = false; + error.with_exception( + [&](const RemoteException& remote_ex) { do_not_retry &= remote_ex.do_not_retry(); }); + return !do_not_retry; } template @@ -143,33 +173,14 @@ void AsyncSingleRequestRpcRetryingCaller::Call(const RegionLocation& loc) } std::shared_ptr rpc_client; - try { - // TODO: There is no connection attempt happening here, no need to try-catch. - rpc_client = conn_->rpc_client(); - } catch (const IOException& e) { - OnError(e, - [&, this]() -> std::string { - return "Get async rpc_client to " + - folly::sformat("{0}:{1}", loc.server_name().host_name(), - loc.server_name().port()) + - " for '" + row_ + "' in " + loc.DebugString() + " of " + - table_name_->namespace_() + "::" + table_name_->qualifier() + - " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + - std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; - }, - [&, this](const std::exception& error) { - conn_->region_locator()->UpdateCachedLocation(loc, error); - }); - return; - } + + rpc_client = conn_->rpc_client(); ResetController(controller_, call_timeout_ns); callable_(controller_, std::make_shared(loc), rpc_client) .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) - .onError([&, this](const std::exception& e) { + .onError([&, this](const exception_wrapper& e) { OnError(e, [&, this]() -> std::string { return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), @@ -182,10 +193,9 @@ void AsyncSingleRequestRpcRetryingCaller::Call(const RegionLocation& loc) " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; }, - [&, this](const std::exception& error) { + [&, this](const exception_wrapper& error) { conn_->region_locator()->UpdateCachedLocation(loc, error); }); - return; }); } diff --git hbase-native-client/core/async-rpc-retrying-caller.h hbase-native-client/core/async-rpc-retrying-caller.h index 6006388..c86ad0b5 100644 --- hbase-native-client/core/async-rpc-retrying-caller.h +++ hbase-native-client/core/async-rpc-retrying-caller.h @@ -18,6 +18,7 @@ */ #pragma once +#include #include #include #include @@ -70,6 +71,7 @@ template class AsyncSingleRequestRpcRetryingCaller { public: AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr retry_timer, std::shared_ptr table_name, const std::string& row, RegionLocateType locate_type, Callable callable, nanoseconds pause, @@ -84,8 +86,10 @@ class AsyncSingleRequestRpcRetryingCaller { private: void LocateThenCall(); - void OnError(const std::exception& error, Supplier err_msg, - Consumer update_cached_location); + void OnError(const folly::exception_wrapper& error, Supplier err_msg, + Consumer update_cached_location); + + bool ShouldRetry(const folly::exception_wrapper& error); void Call(const RegionLocation& loc); @@ -97,8 +101,8 @@ class AsyncSingleRequestRpcRetryingCaller { const int64_t& timeout_ns); private: - folly::HHWheelTimer::UniquePtr retry_timer_; std::shared_ptr conn_; + std::shared_ptr retry_timer_; std::shared_ptr table_name_; std::string row_; RegionLocateType locate_type_; @@ -114,6 +118,5 @@ class AsyncSingleRequestRpcRetryingCaller { uint32_t tries_; std::shared_ptr> exceptions_; uint32_t max_attempts_; - folly::EventBase event_base_; }; } /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index 4956972..ff28e79 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -20,10 +20,13 @@ #include #include #include +#include +#include #include #include #include +#include #include #include @@ -67,15 +70,33 @@ using hbase::Client; using ::testing::Return; using ::testing::_; using std::chrono::nanoseconds; +using std::chrono::milliseconds; -class MockAsyncRegionLocator : public AsyncRegionLocator { +using namespace hbase; + +using folly::exception_wrapper; + +class AsyncRpcRetryTest : public ::testing::Test { public: - explicit MockAsyncRegionLocator(std::shared_ptr region_location) + static std::unique_ptr test_util; + + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique(); + test_util->StartMiniCluster(2); + } +}; +std::unique_ptr AsyncRpcRetryTest::test_util = nullptr; + +class AsyncRegionLocatorBase : public AsyncRegionLocator { + public: + AsyncRegionLocatorBase() {} + explicit AsyncRegionLocatorBase(std::shared_ptr region_location) : region_location_(region_location) {} - ~MockAsyncRegionLocator() = default; + virtual ~AsyncRegionLocatorBase() = default; - folly::Future> LocateRegion(const hbase::pb::TableName&, - const std::string&, + folly::Future> LocateRegion(const hbase::pb::TableName &, + const std::string &, const RegionLocateType, const int64_t) override { folly::Promise> promise; @@ -83,22 +104,102 @@ class MockAsyncRegionLocator : public AsyncRegionLocator { return promise.getFuture(); } - void UpdateCachedLocation(const RegionLocation&, const std::exception&) override {} + virtual void set_region_location(std::shared_ptr region_location) { + region_location_ = region_location; + } + + void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {} - private: + protected: std::shared_ptr region_location_; }; +class MockAsyncRegionLocator : public AsyncRegionLocatorBase { + public: + MockAsyncRegionLocator() : AsyncRegionLocatorBase() {} + explicit MockAsyncRegionLocator(std::shared_ptr region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockAsyncRegionLocator() {} +}; + +class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t tries_ = 0; + uint32_t num_fails_ = 0; + + public: + explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockWrongRegionAsyncRegionLocator() {} + + folly::Future> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + // Fail for num_fails_ times, then delegate to the super class which will give the correct + // region location. + if (tries_++ > num_fails_) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + folly::Promise> promise; + /* set random region name, simulating invalid region */ + auto result = std::make_shared( + "whatever-region-name", region_location_->region_info(), region_location_->server_name(), + region_location_->service()); + promise.setValue(result); + return promise.getFuture(); + } +}; + +class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t tries_ = 0; + uint32_t num_fails_ = 0; + + public: + explicit MockFailingAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockFailingAsyncRegionLocator(std::shared_ptr region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockFailingAsyncRegionLocator() {} + folly::Future> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + // Fail for num_fails_ times, then delegate to the super class which will give the correct + // region location. + if (tries_++ > num_fails_) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + folly::Promise> promise; + promise.setException(std::runtime_error{"Failed to look up region location"}); + return promise.getFuture(); + } +}; + class MockAsyncConnection : public AsyncConnection, public std::enable_shared_from_this { public: MockAsyncConnection(std::shared_ptr conn_conf, + std::shared_ptr retry_timer, + std::shared_ptr cpu_executor, + std::shared_ptr io_executor, + std::shared_ptr retry_executor, std::shared_ptr rpc_client, std::shared_ptr region_locator) - : conn_conf_(conn_conf), rpc_client_(rpc_client), region_locator_(region_locator) {} + : conn_conf_(conn_conf), + retry_timer_(retry_timer), + cpu_executor_(cpu_executor), + io_executor_(io_executor), + retry_executor_(retry_executor), + rpc_client_(rpc_client), + region_locator_(region_locator) {} ~MockAsyncConnection() {} void Init() { - caller_factory_ = std::make_shared(shared_from_this()); + caller_factory_ = + std::make_shared(shared_from_this(), retry_timer_); } std::shared_ptr conf() override { return nullptr; } @@ -108,6 +209,11 @@ class MockAsyncConnection : public AsyncConnection, } std::shared_ptr rpc_client() override { return rpc_client_; } std::shared_ptr region_locator() override { return region_locator_; } + std::shared_ptr cpu_executor() override { return cpu_executor_; } + std::shared_ptr io_executor() override { return io_executor_; } + std::shared_ptr retry_executor() override { + return retry_executor_; + } void Close() override {} std::shared_ptr CreateRpcController() override { @@ -115,17 +221,20 @@ class MockAsyncConnection : public AsyncConnection, } private: + std::shared_ptr retry_timer_; std::shared_ptr conn_conf_; std::shared_ptr caller_factory_; std::shared_ptr rpc_client_; std::shared_ptr region_locator_; + std::shared_ptr cpu_executor_; + std::shared_ptr io_executor_; + std::shared_ptr retry_executor_; }; template class MockRawAsyncTableImpl { public: - explicit MockRawAsyncTableImpl(std::shared_ptr conn) - : conn_(conn), promise_(std::make_shared>>()) {} + explicit MockRawAsyncTableImpl(std::shared_ptr conn) : conn_(conn) {} virtual ~MockRawAsyncTableImpl() = default; /* implement this in real RawAsyncTableImpl. */ @@ -133,11 +242,13 @@ class MockRawAsyncTableImpl { /* in real RawAsyncTableImpl, this should be private. */ folly::Future> GetCall( std::shared_ptr rpc_client, std::shared_ptr controller, - std::shared_ptr loc, const hbase::Get& get) { + std::shared_ptr loc, const hbase::Get &get) { hbase::RpcCall rpc_call = []( std::shared_ptr rpc_client, std::shared_ptr loc, std::shared_ptr controller, std::unique_ptr preq) -> folly::Future> { + VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:" + << loc->DebugString(); return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), User::defaultUser(), "ClientService"); }; @@ -151,17 +262,25 @@ class MockRawAsyncTableImpl { template folly::Future Call(std::shared_ptr rpc_client, std::shared_ptr controller, - std::shared_ptr loc, const REQ& req, + std::shared_ptr loc, const REQ &req, ReqConverter, REQ, std::string> req_converter, - const hbase::RpcCall& rpc_call, + hbase::RpcCall rpc_call, RespConverter resp_converter) { + promise_ = std::make_shared>>(); + auto f = promise_->getFuture(); + VLOG(1) << "calling rpc_call"; rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) .then([&, this, resp_converter](std::unique_ptr presp) { + VLOG(1) << "MockRawAsyncTableImpl#call succeded: "; RESP result = resp_converter(*presp); promise_->setValue(result); }) - .onError([this](const std::exception& e) { promise_->setException(e); }); - return promise_->getFuture(); + .onError([this](const exception_wrapper &e) { + VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what(); + VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name(); + promise_->setException(e); + }); + return f; } private: @@ -169,22 +288,19 @@ class MockRawAsyncTableImpl { std::shared_ptr>> promise_; }; -TEST(AsyncRpcRetryTest, TestGetBasic) { - // Using TestUtil to populate test data - auto test_util = std::make_unique(); - test_util->StartMiniCluster(2); - - test_util->CreateTable("t", "d"); +void runTest(std::shared_ptr region_locator, std::string tableName, + uint32_t operation_timeout_millis = 1200000) { + AsyncRpcRetryTest::test_util->CreateTable(tableName, "d"); // Create TableName and Row to be fetched from HBase - auto tn = folly::to("t"); + auto tn = folly::to(tableName); auto row = "test2"; // Get to be performed on above HBase Table hbase::Get get(row); // Create a client - Client client(*(test_util->conf())); + Client client(*(AsyncRpcRetryTest::test_util->conf())); // Get connection to HBase Table auto table = client.Table(tn); @@ -196,24 +312,32 @@ TEST(AsyncRpcRetryTest, TestGetBasic) { /* init region location and rpc channel */ auto region_location = table->GetRegionLocation(row); - auto io_executor_ = std::make_shared(1); + // auto io_executor_ = std::make_shared(4); + auto cpu_executor_ = std::make_shared(4); + auto io_executor_ = client.async_connection()->io_executor(); + auto retry_executor_ = std::make_shared(1); auto codec = std::make_shared(); auto rpc_client = std::make_shared(io_executor_, codec); + // auto retry_event_base_ = std::make_shared(true); + std::shared_ptr retry_timer = + folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); /* init connection configuration */ auto connection_conf = std::make_shared( - TimeUtil::SecondsToNanos(20), // connect_timeout - TimeUtil::SecondsToNanos(1200), // operation_timeout - TimeUtil::SecondsToNanos(60), // rpc_timeout - TimeUtil::MillisToNanos(100), // pause - 31, // max retries - 9); // start log errors count + TimeUtil::SecondsToNanos(20), // connect_timeout + TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout + TimeUtil::SecondsToNanos(60), // rpc_timeout + TimeUtil::MillisToNanos(100), // pause + 5, // max retries + 9); // start log errors count - /* init region locator */ - auto region_locator = std::make_shared(region_location); + /* set region locator */ + region_locator->set_region_location(region_location); /* init hbase client connection */ - auto conn = std::make_shared(connection_conf, rpc_client, region_locator); + auto conn = std::make_shared(connection_conf, retry_timer, cpu_executor_, + io_executor_, retry_executor_, rpc_client, + region_locator); conn->Init(); /* init retry caller factory */ @@ -237,7 +361,9 @@ TEST(AsyncRpcRetryTest, TestGetBasic) { }) ->Build(); - auto result = async_caller->Call().get(); + auto promise = std::make_shared>>(); + + auto result = async_caller->Call().get(milliseconds(500000)); // Test the values, should be same as in put executed on hbase shell ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; @@ -245,6 +371,50 @@ TEST(AsyncRpcRetryTest, TestGetBasic) { EXPECT_EQ("value2", *(result->Value("d", "2"))); EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + retry_timer->destroy(); table->Close(); client.Close(); + retry_executor_->stop(); +} + +// Test successful case +TEST_F(AsyncRpcRetryTest, TestGetBasic) { + std::shared_ptr region_locator( + std::make_shared()); + runTest(region_locator, "table1"); +} + +// Tests the RPC failing 3 times, then succeeding +TEST_F(AsyncRpcRetryTest, TestHandleException) { + std::shared_ptr region_locator( + std::make_shared(3)); + runTest(region_locator, "table2"); +} + +// Tests the RPC failing 5 times, throwing an exception +TEST_F(AsyncRpcRetryTest, TestFailWithException) { + std::shared_ptr region_locator( + std::make_shared(5)); + EXPECT_ANY_THROW(runTest(region_locator, "table3")); +} + +// Tests the region location lookup failing 3 times, then succeeding +TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) { + std::shared_ptr region_locator( + std::make_shared(3)); + runTest(region_locator, "table4"); +} + +// Tests the region location lookup failing 5 times, throwing an exception +TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) { + std::shared_ptr region_locator( + std::make_shared(5)); + EXPECT_ANY_THROW(runTest(region_locator, "table5")); +} + +// Tests hitting operation timeout, thus not retrying anymore +TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) { + std::shared_ptr region_locator( + std::make_shared(3)); + EXPECT_ANY_THROW(runTest(region_locator, "table6", 200)); } diff --git hbase-native-client/core/client-test.cc hbase-native-client/core/client-test.cc index ff4879a..274168f 100644 --- hbase-native-client/core/client-test.cc +++ hbase-native-client/core/client-test.cc @@ -156,6 +156,7 @@ TEST_F(ClientTest, GetForNonExistentTable) { // Get to be performed on above HBase Table hbase::Get get(row); + ClientTest::test_util->conf()->SetInt("hbase.client.retries.number", 5); // Create a client hbase::Client client(*ClientTest::test_util->conf()); diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index 0e11278..2719470 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -61,6 +61,11 @@ class Client { */ void Close(); + /** + * @brief Internal. DO NOT USE. + */ + std::shared_ptr async_connection() { return async_connection_; } + private: /** Data */ std::shared_ptr async_connection_; diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index 07c3d61..e0afcfb 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -284,7 +284,8 @@ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const st table_locs->erase(row); } -void LocationCache::UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) { +void LocationCache::UpdateCachedLocation(const RegionLocation &loc, + const folly::exception_wrapper &error) { // TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later. ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key()); } diff --git hbase-native-client/core/location-cache.h hbase-native-client/core/location-cache.h index 5e79213..a3c15cb 100644 --- hbase-native-client/core/location-cache.h +++ hbase-native-client/core/location-cache.h @@ -18,6 +18,7 @@ */ #pragma once +#include #include #include #include @@ -180,7 +181,7 @@ class LocationCache : public AsyncRegionLocator { * Update cached region location, possibly using the information from exception. */ virtual void UpdateCachedLocation(const RegionLocation &loc, - const std::exception &error) override; + const folly::exception_wrapper &error) override; const std::string &zk_quorum() { return zk_quorum_; } diff --git hbase-native-client/exceptions/BUCK hbase-native-client/exceptions/BUCK index a23654c..eef4437 100644 --- hbase-native-client/exceptions/BUCK +++ hbase-native-client/exceptions/BUCK @@ -21,4 +21,4 @@ cxx_library( srcs=[], deps=["//third-party:folly",], compiler_flags=['-Weffc++'], - visibility=['//core/...'],) \ No newline at end of file + visibility=['//core/...','//connection//...'],) \ No newline at end of file diff --git hbase-native-client/exceptions/exception.h hbase-native-client/exceptions/exception.h index c0c4142..2943d57 100644 --- hbase-native-client/exceptions/exception.h +++ hbase-native-client/exceptions/exception.h @@ -22,52 +22,55 @@ #include #include #include +#include namespace hbase { class ThrowableWithExtraContext { public: - ThrowableWithExtraContext(std::shared_ptr cause, + ThrowableWithExtraContext(folly::exception_wrapper cause, const long& when) : cause_(cause), when_(when), extras_("") { } - ThrowableWithExtraContext(std::shared_ptr cause, + ThrowableWithExtraContext(folly::exception_wrapper cause, const long& when, const std::string& extras) : cause_(cause), when_(when), extras_(extras) { } - std::string ToString() { + virtual std::string ToString() { // TODO: // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); - return extras_ + ", " + cause_->what(); + return extras_ + ", " + cause_.what().toStdString(); } - std::shared_ptr cause() { + virtual folly::exception_wrapper cause() { return cause_; } private: - std::shared_ptr cause_; + folly::exception_wrapper cause_; long when_; std::string extras_; }; class IOException: public std::logic_error { public: + IOException() : logic_error("") {} + IOException( const std::string& what) : - logic_error(what), cause_(nullptr) {} + logic_error(what) {} IOException( const std::string& what, - std::shared_ptr cause) : + folly::exception_wrapper cause) : logic_error(what), cause_(cause) {} virtual ~IOException() = default; - std::shared_ptr cause() { + virtual folly::exception_wrapper cause() { return cause_; } private: - const std::shared_ptr cause_; + folly::exception_wrapper cause_; }; class RetriesExhaustedException: public IOException { @@ -77,7 +80,7 @@ public: std::shared_ptr> exceptions) : IOException( GetMessage(num_retries, exceptions), - exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + exceptions->empty() ? folly::exception_wrapper{} : (*exceptions)[exceptions->size() - 1].cause()){ } virtual ~RetriesExhaustedException() = default; @@ -99,6 +102,71 @@ private: class HBaseIOException : public IOException { }; -class DoNotRetryIOException : public HBaseIOException { +class RemoteException : public IOException { +public: + + RemoteException() : port_(0), do_not_retry_(false) {} + + RemoteException(const std::string& what) : + IOException(what), port_(0), do_not_retry_(false) {} + + RemoteException( + const std::string& what, + folly::exception_wrapper cause) : + IOException(what, cause), port_(0), do_not_retry_(false) {} + + virtual ~RemoteException() = default; + + std::string exception_class_name() const { + return exception_class_name_; + } + + RemoteException* set_exception_class_name(const std::string& value) { + exception_class_name_ = value; + return this; + } + + std::string stack_trace() const { + return stack_trace_; + } + + RemoteException* set_stack_trace(const std::string& value) { + stack_trace_ = value; + return this; + } + + std::string hostname() const { + return hostname_; + } + + RemoteException* set_hostname(const std::string& value) { + hostname_ = value; + return this; + } + + int port() const { + return port_; + } + + RemoteException* set_port(int value) { + port_ = value; + return this; + } + + bool do_not_retry() const { + return do_not_retry_; + } + + RemoteException* set_do_not_retry(bool value) { + do_not_retry_ = value; + return this; + } + +private: + std::string exception_class_name_; + std::string stack_trace_; + std::string hostname_; + int port_; + bool do_not_retry_; }; } // namespace hbase