From ca9a282d021cf8a8412f0bece0806e2a7960d152 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Fri, 3 Mar 2017 17:18:22 -0800 Subject: [PATCH] HBASE-17465. [C++] implement request retry mechanism over RPC --- hbase-native-client/Makefile | 2 +- hbase-native-client/bin/start-docker.sh | 2 +- hbase-native-client/connection/BUCK | 12 +- .../connection/connection-factory.cc | 7 +- .../connection/connection-factory.h | 3 +- hbase-native-client/connection/connection-pool.cc | 4 +- hbase-native-client/connection/rpc-client.cc | 39 +--- hbase-native-client/connection/rpc-client.h | 42 +--- hbase-native-client/core/BUCK | 96 ++++++-- .../core/async-rpc-retrying-caller-factory.cc | 22 ++ .../core/async-rpc-retrying-caller-factory.h | 121 ++++++++++ .../core/async-rpc-retrying-caller.cc | 22 ++ .../core/async-rpc-retrying-caller.h | 252 ++++++++++++++++++++ .../core/async-rpc-retrying-test.cc | 255 +++++++++++++++++++++ hbase-native-client/core/client.cc | 3 +- hbase-native-client/core/client.h | 2 +- hbase-native-client/core/filter.h | 2 +- hbase-native-client/core/hbase-rpc-controller.cc | 22 ++ hbase-native-client/core/hbase-rpc-controller.h | 54 +++++ hbase-native-client/core/location-cache.cc | 1 + hbase-native-client/core/region-location.h | 2 + hbase-native-client/core/table.cc | 4 + hbase-native-client/core/table.h | 5 + hbase-native-client/exceptions/BUCK | 24 ++ hbase-native-client/exceptions/exception.h | 104 +++++++++ hbase-native-client/security/BUCK | 4 +- hbase-native-client/serde/BUCK | 52 +++-- hbase-native-client/third-party/BUCK | 4 +- hbase-native-client/utils/BUCK | 25 +- hbase-native-client/utils/connection-util.cc | 26 +++ hbase-native-client/utils/connection-util.h | 71 ++++++ hbase-native-client/utils/sys-util.h | 39 ++++ hbase-native-client/utils/time-util.h | 52 +++++ 33 files changed, 1242 insertions(+), 133 deletions(-) create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.h create mode 100644 hbase-native-client/core/async-rpc-retrying-test.cc create mode 100644 hbase-native-client/core/hbase-rpc-controller.cc create mode 100644 hbase-native-client/core/hbase-rpc-controller.h create mode 100644 hbase-native-client/exceptions/BUCK create mode 100644 hbase-native-client/exceptions/exception.h create mode 100644 hbase-native-client/utils/connection-util.cc create mode 100644 hbase-native-client/utils/connection-util.h create mode 100644 hbase-native-client/utils/sys-util.h create mode 100644 hbase-native-client/utils/time-util.h diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index 84ae556..b926220 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils security +MODULES = connection core serde test-util utils security exceptions SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git a/hbase-native-client/bin/start-docker.sh b/hbase-native-client/bin/start-docker.sh index 1380cdf..8b017a0 100755 --- a/hbase-native-client/bin/start-docker.sh +++ b/hbase-native-client/bin/start-docker.sh @@ -56,7 +56,7 @@ docker build -t hbase_native . # After the image is built run the thing docker run -p 16050:16050/tcp \ - -v ${BASE_DIR}/..:/usr/src/hbase \ + -v ${BASE_DIR}/..:/usr/src/hbase \ -v ~/.m2:/root/.m2 \ -it hbase_native /bin/bash popd diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index 19536d5..bc05be0 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -50,8 +50,14 @@ cxx_library( "//third-party:wangle", ], compiler_flags=['-Weffc++'], - visibility=['//core/...',],) + visibility=[ + '//core/...', + ],) cxx_test( name="connection-pool-test", - srcs=["connection-pool-test.cc",], - deps=[":connection",],) + srcs=[ + "connection-pool-test.cc", + ], + deps=[ + ":connection", + ],) diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 2f7e75c..832b00f 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -31,11 +31,10 @@ using std::chrono::milliseconds; using std::chrono::nanoseconds; ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool, - std::shared_ptr codec, - nanoseconds connect_timeout) + std::shared_ptr codec, nanoseconds connect_timeout) : connect_timeout_(connect_timeout), - io_pool_(io_pool), - pipeline_factory_(std::make_shared(codec)) {} + io_pool_(io_pool), + pipeline_factory_(std::make_shared(codec)) {} std::shared_ptr> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared>(); diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index fbcb6ef..32d0bf7 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -44,8 +44,7 @@ class ConnectionFactory { * There should only be one ConnectionFactory per client. */ ConnectionFactory(std::shared_ptr io_pool, - std::shared_ptr codec, - nanoseconds connect_timeout = nanoseconds(0)); + std::shared_ptr codec, nanoseconds connect_timeout = nanoseconds(0)); /** Default Destructor */ virtual ~ConnectionFactory() = default; diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index b18ee89..4fe4610 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -34,8 +35,7 @@ using folly::SharedMutexWritePriority; using folly::SocketAddress; ConnectionPool::ConnectionPool(std::shared_ptr io_executor, - std::shared_ptr codec, - nanoseconds connect_timeout) + std::shared_ptr codec, nanoseconds connect_timeout) : cf_(std::make_shared(io_executor, codec, connect_timeout)), clients_(), connections_(), diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index c61a73e..3f86aba 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -18,27 +18,14 @@ */ #include "connection/rpc-client.h" +#include #include #include using hbase::RpcClient; -using hbase::AbstractRpcChannel; namespace hbase { -class RpcChannelImplementation : public AbstractRpcChannel { - public: - RpcChannelImplementation(std::shared_ptr rpc_client, const std::string& host, - uint16_t port, std::shared_ptr ticket, int rpc_timeout) - : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {} - - void CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* request, - Message* response, Closure* done) override { - rpc_client_->CallMethod(method, controller, request, response, done, host_, port_, ticket_); - } -}; -} // namespace hbase - RpcClient::RpcClient(std::shared_ptr io_executor, std::shared_ptr codec, nanoseconds connect_timeout) : io_executor_(io_executor) { @@ -80,26 +67,4 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { return cp_->GetConnection(remote_id); } - -std::shared_ptr RpcClient::CreateRpcChannel(const std::string& host, uint16_t port, - std::shared_ptr ticket, - int rpc_timeout) { - std::shared_ptr channel = std::make_shared( - shared_from_this(), host, port, ticket, rpc_timeout); - - /* static_pointer_cast is safe since RpcChannelImplementation derives - * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */ - return std::static_pointer_cast(channel); -} - -void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* controller, - const Message* req_msg, Message* resp_msg, Closure* done, - const std::string& host, uint16_t port, std::shared_ptr ticket) { - std::shared_ptr shared_req(const_cast(req_msg)); - std::shared_ptr shared_resp(resp_msg); - - std::unique_ptr req = std::make_unique(shared_req, shared_resp, method->name()); - - AsyncCall(host, port, std::move(req), ticket, method->service()->name()) - .then([done, this](std::unique_ptr resp) { done->Run(); }); -} +} // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index 5c11ab5..d416ceb 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -38,24 +38,15 @@ using hbase::ConnectionPool; using hbase::RpcConnection; using hbase::security::User; -using google::protobuf::MethodDescriptor; -using google::protobuf::RpcChannel; using google::protobuf::Message; -using google::protobuf::RpcController; -using google::protobuf::Closure; - using std::chrono::nanoseconds; -class RpcChannelImplementation; - namespace hbase { -class RpcClient : public std::enable_shared_from_this { - friend class RpcChannelImplementation; - +class RpcClient { public: - RpcClient(std::shared_ptr io_executor, - std::shared_ptr codec, nanoseconds connect_timeout); + RpcClient(std::shared_ptr io_executor, std::shared_ptr codec, + nanoseconds connect_timeout = nanoseconds(0)); virtual ~RpcClient() { Close(); } @@ -79,40 +70,13 @@ class RpcClient : public std::enable_shared_from_this { virtual void Close(); - virtual std::shared_ptr CreateRpcChannel(const std::string &host, uint16_t port, - std::shared_ptr ticket, - int rpc_timeout); - std::shared_ptr connection_pool() const { return cp_; } private: - void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg, - Message *resp_msg, Closure *done, const std::string &host, uint16_t port, - std::shared_ptr ticket); std::shared_ptr GetConnection(std::shared_ptr remote_id); private: std::shared_ptr cp_; std::shared_ptr io_executor_; }; - -class AbstractRpcChannel : public RpcChannel { - public: - AbstractRpcChannel(std::shared_ptr rpc_client, const std::string &host, uint16_t port, - std::shared_ptr ticket, int rpc_timeout) - : rpc_client_(rpc_client), - host_(host), - port_(port), - ticket_(ticket), - rpc_timeout_(rpc_timeout) {} - - virtual ~AbstractRpcChannel() = default; - - protected: - std::shared_ptr rpc_client_; - std::string host_; - uint16_t port_; - std::shared_ptr ticket_; - int rpc_timeout_; -}; } // namespace hbase diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index e541d8f..2d20806 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -40,6 +40,9 @@ cxx_library( "request_converter.h", "response_converter.h", "table.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", + "hbase-rpc-controller.h", ], srcs=[ "cell.cc", @@ -58,6 +61,8 @@ cxx_library( "table.cc", ], deps=[ + "//exceptions:exceptions", + "//utils:utils", "//connection:connection", "//if:if", "//serde:serde", @@ -66,10 +71,14 @@ cxx_library( "//third-party:zookeeper_mt", ], compiler_flags=['-Weffc++'], - visibility=['PUBLIC',],) + visibility=[ + 'PUBLIC', + ],) cxx_test( name="location-cache-test", - srcs=["location-cache-test.cc",], + srcs=[ + "location-cache-test.cc", + ], deps=[ ":core", "//test-util:test-util", @@ -77,12 +86,18 @@ cxx_test( run_test_separately=True,) cxx_test( name="cell-test", - srcs=["cell-test.cc",], - deps=[":core",], + srcs=[ + "cell-test.cc", + ], + deps=[ + ":core", + ], run_test_separately=True,) cxx_test( name="filter-test", - srcs=["filter-test.cc",], + srcs=[ + "filter-test.cc", + ], deps=[ ":core", "//if:if", @@ -92,37 +107,74 @@ cxx_test( run_test_separately=True,) cxx_test( name="get-test", - srcs=["get-test.cc",], - deps=[":core",], + srcs=[ + "get-test.cc", + ], + deps=[ + ":core", + ], + run_test_separately=True,) +cxx_test( + name="retry-test", + srcs=[ + "async-rpc-retrying-test.cc", + ], + deps=[ + ":core", + "//test-util:test-util", + "//exceptions:exceptions", + ], run_test_separately=True,) cxx_test( name="time_range-test", - srcs=["time_range-test.cc",], - deps=[":core",], + srcs=[ + "time_range-test.cc", + ], + deps=[ + ":core", + ], run_test_separately=True,) cxx_test( name="configuration-test", - srcs=["configuration-test.cc",], - deps=[":core",], + srcs=[ + "configuration-test.cc", + ], + deps=[ + ":core", + ], run_test_separately=True,) cxx_test( name="hbase_configuration-test", - srcs=["hbase_configuration-test.cc",], - deps=[":core",], + srcs=[ + "hbase_configuration-test.cc", + ], + deps=[ + ":core", + ], run_test_separately=True,) cxx_test( name="scan-test", - srcs=["scan-test.cc",], - deps=[":core",], + srcs=[ + "scan-test.cc", + ], + deps=[ + ":core", + ], run_test_separately=True,) cxx_test( name="result-test", - srcs=["result-test.cc",], - deps=[":core",], + srcs=[ + "result-test.cc", + ], + deps=[ + ":core", + ], run_test_separately=True,) cxx_test( name="request_converter-test", - srcs=["request_converter-test.cc",], + srcs=[ + "request_converter-test.cc", + ], deps=[ ":core", "//connection:connection", @@ -131,7 +183,9 @@ cxx_test( run_test_separately=True,) cxx_test( name="client-test", - srcs=["client-test.cc",], + srcs=[ + "client-test.cc", + ], deps=[ ":core", "//if:if", @@ -141,5 +195,7 @@ cxx_test( run_test_separately=True,) cxx_binary( name="simple-client", - srcs=["simple-client.cc",], + srcs=[ + "simple-client.cc", + ], deps=[":core", "//connection:connection"],) diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..d9ceb0f --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller-factory.h" + +namespace hbase {} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..14fa52d --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include "async-rpc-retrying-caller.h" +#include "connection/rpc-client.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +using namespace folly; +using hbase::pb::TableName; + +namespace hbase { + +template +class SingleRequestCallerBuilder + : public std::enable_shared_from_this> { + public: + SingleRequestCallerBuilder(std::shared_ptr conn) + : conn_(conn), + table_name_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr table(std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr rpc_timeout(long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr operation_timeout(long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr row(const std::string& row) { + row_ = row; + return shared_this(); + } + + SharedThisPtr locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable callable) { + callable_ = callable; + return shared_this(); + } + + folly::Future Call() { return Build()->Call(); } + + std::shared_ptr> Build() { + return std::make_shared>( + conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(), + conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_, + conn_->get_conn_conf()->GetStartLogErrorsCount()); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr table_name_; + long rpc_timeout_nanos_; + long operation_timeout_nanos_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; +}; // end of SingleRequestCallerBuilder + +template +class AsyncRpcRetryingCallerFactory { + private: + std::shared_ptr conn_; + + public: + AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template + std::shared_ptr> Single() { + return std::make_shared>(conn_); + } +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc new file mode 100644 index 0000000..318122a --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller.h" + +namespace hbase {} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h new file mode 100644 index 0000000..6dd1ecf --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "connection/rpc-client.h" +#include "exceptions/exception.h" +#include "hbase-rpc-controller.h" +#include "if/HBase.pb.h" +#include "region-location.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +template +using Supplier = std::function; + +template +using Consumer = std::function; + +template +using ReqConverter = std::function; + +template +using RespConverter = std::function; + +template +using RpcCallback = std::function; + +template +using RpcCall = std::function>( + std::shared_ptr, std::shared_ptr, + std::shared_ptr, std::unique_ptr)>; + +template +using Callable = std::function(std::shared_ptr, + std::shared_ptr, + std::shared_ptr)>; + +template +class AsyncSingleRequestRpcRetryingCaller { + public: + AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr table_name, + const std::string& row, RegionLocateType locate_type, + Callable callable, long pause_ns, + int max_retries, long operation_timeout_nanos, + long rpc_timeout_nanos, int start_log_errors_count) + : conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + controller_ = conn_->get_rpc_controller_factory()->NewController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared>(); + retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); + } + + virtual ~AsyncSingleRequestRpcRetryingCaller() {} + + folly::Future Call() { + auto f = promise_->getFuture(); + LocateThenCall(); + return f; + } + + private: + void LocateThenCall() { + long locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->get_locator() + ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](RegionLocation& loc) { Call(loc); }) + .onError([this](const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + + table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [](const std::exception& error) {}); + }); + } + + void OnError(const std::exception& error, Supplier err_msg, + Consumer update_cached_location) { + ThrowableWithExtraContext twec(std::make_shared(error), + TimeUtil::GetNowNanos()); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf(error) || + tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + long delay_ns; + if (operation_timeout_nanos_ > 0) { + long max_delay_ns = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); + } + + void Call(RegionLocation& loc) { + long call_timeout_ns; + if (operation_timeout_nanos_ > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_); + } else { + call_timeout_ns = rpc_timeout_nanos_; + } + + std::shared_ptr rpc_client; + try { + rpc_client = conn_->GetRpcClient(); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async rpc_client to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error); + }); + return; + } + + ConnectionUtils::ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared(loc), rpc_client) + .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) + .onError([&, this](const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error); + }); + return; + }); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + long RemainingTimeNs() { + return operation_timeout_nanos_ - (TimeUtil::GetNowNanos() - start_ns_); + } + + private: + folly::HHWheelTimer::UniquePtr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; + long pause_ns_; + int max_retries_; + long operation_timeout_nanos_; + long rpc_timeout_nanos_; + int start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + long start_ns_; + int tries_; + std::shared_ptr> exceptions_; + int max_attempts_; + folly::EventBase event_base_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc new file mode 100644 index 0000000..c42cf0e --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include "hbase-rpc-controller.h" +#include "if/HBase.pb.h" + +#include +#include +#include +#include +#include +#include "async-rpc-retrying-caller-factory.h" +#include "async-rpc-retrying-caller.h" +#include "client.h" +#include "connection/request.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/rpc-client.h" +#include "connection/rpc-client.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/response_converter.h" +#include "core/result.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "keyvalue-codec.h" +#include "region-location.h" +#include "test-util/test-util.h" + +using namespace google::protobuf; +using namespace hbase; +using namespace hbase::pb; +using namespace std::placeholders; +using namespace testing; +using ::testing::Return; +using ::testing::_; +using std::chrono::nanoseconds; + +class MockRpcControllerFactory { + public: + MOCK_METHOD0(NewController, std::shared_ptr()); +}; + +class MockAsyncConnectionConfiguration { + public: + MOCK_METHOD0(GetPauseNs, long()); + MOCK_METHOD0(GetMaxRetries, int()); + MOCK_METHOD0(GetStartLogErrorsCount, int()); + MOCK_METHOD0(GetReadRpcTimeoutNs, long()); + MOCK_METHOD0(GetOperationTimeoutNs, long()); +}; + +class AsyncRegionLocator { + public: + AsyncRegionLocator(std::shared_ptr region_location) + : region_location_(region_location) {} + ~AsyncRegionLocator() = default; + + folly::Future GetRegionLocation(std::shared_ptr, + const std::string&, RegionLocateType, long) { + folly::Promise promise; + promise.setValue(*region_location_); + return promise.getFuture(); + } + + void UpdateCachedLocation(RegionLocation&, const std::exception&) {} + + private: + std::shared_ptr region_location_; +}; + +class MockAsyncConnection { + public: + MOCK_METHOD0(get_conn_conf, std::shared_ptr()); + MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr()); + MOCK_METHOD0(get_locator, std::shared_ptr()); + MOCK_METHOD0(GetRpcClient, std::shared_ptr()); +}; + +template +class MockRawAsyncTableImpl { + public: + MockRawAsyncTableImpl(std::shared_ptr conn) + : conn_(conn), promise_(std::make_shared>()) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + + /* in real RawAsyncTableImpl, this should be private. */ + folly::Future GetCall(std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc, const hbase::Get& get) { + hbase::RpcCall rpc_call = []( + std::shared_ptr rpc_client, std::shared_ptr loc, + std::shared_ptr controller, + std::unique_ptr preq) -> folly::Future> { + return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(preq), User::defaultUser(), "ClientService"); + }; + + return Call( + rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, + &hbase::ResponseConverter::FromGetResponse); + } + + /* in real RawAsyncTableImpl, this should be private. */ + template + folly::Future Call( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + const ReqConverter, REQ, std::string>& req_converter, + const hbase::RpcCall& rpc_call, + const RespConverter, PRESP>& resp_converter) { + rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) + .then([&, this](std::unique_ptr presp) { + std::unique_ptr result = hbase::ResponseConverter::FromGetResponse(*presp); + promise_->setValue(std::move(*result)); + }) + .onError([this](const std::exception& e) { promise_->setException(e); }); + return promise_->getFuture(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr> promise_; +}; + +TEST(AsyncRpcRetryTest, TestGetBasic) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + + // Using TestUtil to populate test data + hbase::TestUtil* test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::Configuration conf; + + // Create a client + Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + /* init region location and rpc channel */ + auto region_location = table->GetRegionLocation(row); + + auto io_executor_ = std::make_shared(1); + auto codec = std::make_shared(); + auto rpc_client = std::make_shared(io_executor_, codec); + + /* init rpc controller */ + auto controller = std::make_shared(); + + /* init rpc controller factory */ + auto controller_factory = std::make_shared(); + EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller)); + + /* init connection configuration */ + auto connection_conf = std::make_shared(); + EXPECT_CALL((*connection_conf), GetPauseNs()).Times(1).WillRepeatedly(Return(100000000)); + EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31)); + EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9)); + EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs()) + .Times(1) + .WillRepeatedly(Return(60000000000)); + EXPECT_CALL((*connection_conf), GetOperationTimeoutNs()) + .Times(1) + .WillRepeatedly(Return(1200000000000)); + + /* init region locator */ + auto region_locator = std::make_shared(region_location); + + /* init hbase client connection */ + auto conn = std::make_shared(); + EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf)); + EXPECT_CALL((*conn), get_rpc_controller_factory()) + .Times(AtLeast(1)) + .WillRepeatedly(Return(controller_factory)); + EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator)); + EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client)); + + /* init retry caller factory */ + auto tableImpl = std::make_shared>(conn); + AsyncRpcRetryingCallerFactory caller_factory(conn); + + /* init request caller builder */ + auto builder = caller_factory.Single(); + + /* call with retry to get result */ + try { + auto async_caller = + builder->table(std::make_shared(tn)) + ->row(row) + ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) + ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) + ->action( + [=, &get]( + std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) + ->Build(); + + hbase::Result result = async_caller->Call().get(); + + /*Stopping the connection as we are getting segfault due to some folly issue + The connection stays open and we don't want that. + So we are stopping the connection. + We can remove this once we have fixed the folly part */ + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result.Row()); + EXPECT_EQ("value2", *(result.Value("d", "2"))); + EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + throw e; + } + + table->Close(); + client.Close(); +} diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 240da72..f0483ef 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -57,7 +57,8 @@ void Client::init(const hbase::Configuration &conf) { } else { LOG(WARNING) << "Not using RPC Cell Codec"; } - rpc_client_ = std::make_shared(io_executor_, codec, conn_conf_->connect_timeout()); + rpc_client_ = + std::make_shared(io_executor_, codec, conn_conf_->connect_timeout()); location_cache_ = std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); } diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index a96d6f3..e73ab70 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -89,7 +89,7 @@ class Client { bool is_closed_ = false; /** Methods */ - void init(const hbase::Configuration &conf); + void init(const hbase::Configuration& conf); }; } // namespace hbase diff --git a/hbase-native-client/core/filter.h b/hbase-native-client/core/filter.h index b5b7133..10accaa 100644 --- a/hbase-native-client/core/filter.h +++ b/hbase-native-client/core/filter.h @@ -20,9 +20,9 @@ #pragma once #include +#include #include #include -#include #include #include "if/Comparator.pb.h" diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc new file mode 100644 index 0000000..ed3bd6e --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase-rpc-controller.h" + +namespace hbase {} /* namespace hbase */ diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h new file mode 100644 index 0000000..3f76c59 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include + +using google::protobuf::RpcController; +using google::protobuf::Closure; +using namespace std; + +namespace hbase { + +class HBaseRpcController : public RpcController { + public: + HBaseRpcController() {} + virtual ~HBaseRpcController() = default; + + void set_call_timeout(const long& call_timeout) { + // TODO: + } + + void Reset() override {} + + bool Failed() const override { return false; } + + string ErrorText() const override { return ""; } + + void StartCancel() override {} + + void SetFailed(const string& reason) override {} + + bool IsCanceled() const override { return false; } + + void NotifyOnCancel(Closure* callback) override {} +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index da9f64a..17032fe 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -25,6 +25,7 @@ #include +#include #include "connection/response.h" #include "connection/rpc-connection.h" #include "if/Client.pb.h" diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index b0411cb..53b7a3b 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -26,6 +26,8 @@ namespace hbase { +enum RegionLocateType { kBefore, kCurrent, kAfter }; + /** * @brief class to hold where a region is located. * diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 4e30d4b..ba4dc29 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -71,4 +71,8 @@ void Table::Close() { is_closed_ = true; } +std::shared_ptr Table::GetRegionLocation(const std::string &row) { + return location_cache_->LocateRegion(*table_name_, row).get(); +} + } /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 0e98cd2..f82382e 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -57,6 +57,11 @@ class Table { */ void Close(); + /** + * @brief - Get region location for a row in current table. + */ + std::shared_ptr GetRegionLocation(const std::string &row); + private: std::shared_ptr table_name_; std::shared_ptr location_cache_; diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK new file mode 100644 index 0000000..a23654c --- /dev/null +++ b/hbase-native-client/exceptions/BUCK @@ -0,0 +1,24 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cxx_library( + name="exceptions", + exported_headers=["exception.h",], + srcs=[], + deps=["//third-party:folly",], + compiler_flags=['-Weffc++'], + visibility=['//core/...'],) \ No newline at end of file diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h new file mode 100644 index 0000000..c0c4142 --- /dev/null +++ b/hbase-native-client/exceptions/exception.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +namespace hbase { + +class ThrowableWithExtraContext { +public: + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when) : + cause_(cause), when_(when), extras_("") { + } + + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when, const std::string& extras) : + cause_(cause), when_(when), extras_(extras) { + } + + std::string ToString() { + // TODO: + // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + return extras_ + ", " + cause_->what(); + } + + std::shared_ptr cause() { + return cause_; + } +private: + std::shared_ptr cause_; + long when_; + std::string extras_; +}; + +class IOException: public std::logic_error { +public: + IOException( + const std::string& what) : + logic_error(what), cause_(nullptr) {} + IOException( + const std::string& what, + std::shared_ptr cause) : + logic_error(what), cause_(cause) {} + virtual ~IOException() = default; + + std::shared_ptr cause() { + return cause_; + } +private: + const std::shared_ptr cause_; +}; + +class RetriesExhaustedException: public IOException { +public: + RetriesExhaustedException( + const int& num_retries, + std::shared_ptr> exceptions) : + IOException( + GetMessage(num_retries, exceptions), + exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + } + virtual ~RetriesExhaustedException() = default; + +private: + std::string GetMessage( + const int& num_retries, + std::shared_ptr> exceptions) { + std::string buffer("Failed after attempts="); + buffer.append(std::to_string(num_retries + 1)); + buffer.append(", exceptions:\n"); + for (auto it = exceptions->begin(); it != exceptions->end(); it++) { + buffer.append(it->ToString()); + buffer.append("\n"); + } + return buffer; + } +}; + +class HBaseIOException : public IOException { +}; + +class DoNotRetryIOException : public HBaseIOException { +}; +} // namespace hbase diff --git a/hbase-native-client/security/BUCK b/hbase-native-client/security/BUCK index e176c90..d602ff3 100644 --- a/hbase-native-client/security/BUCK +++ b/hbase-native-client/security/BUCK @@ -19,7 +19,9 @@ # to a single server. cxx_library( name="security", - exported_headers=["user.h",], + exported_headers=[ + "user.h", + ], srcs=[], deps=[], compiler_flags=['-Weffc++'], diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK index 38e7b4d..76d1c72 100644 --- a/hbase-native-client/serde/BUCK +++ b/hbase-native-client/serde/BUCK @@ -41,28 +41,54 @@ cxx_library( ":region-info-deserializer-test", ], compiler_flags=['-Weffc++'], - visibility=['PUBLIC',],) + visibility=[ + 'PUBLIC', + ],) cxx_test( name="table-name-test", - srcs=["table-name-test.cc",], - deps=[":serde",],) + srcs=[ + "table-name-test.cc", + ], + deps=[ + ":serde", + ],) cxx_test( name="server-name-test", - srcs=["server-name-test.cc",], - deps=[":serde",],) + srcs=[ + "server-name-test.cc", + ], + deps=[ + ":serde", + ],) cxx_test( name="client-serializer-test", - srcs=["client-serializer-test.cc",], - deps=[":serde",],) + srcs=[ + "client-serializer-test.cc", + ], + deps=[ + ":serde", + ],) cxx_test( name="client-deserializer-test", - srcs=["client-deserializer-test.cc",], - deps=[":serde",],) + srcs=[ + "client-deserializer-test.cc", + ], + deps=[ + ":serde", + ],) cxx_test( name="zk-deserializer-test", - srcs=["zk-deserializer-test.cc",], - deps=[":serde",],) + srcs=[ + "zk-deserializer-test.cc", + ], + deps=[ + ":serde", + ],) cxx_test( name="region-info-deserializer-test", - srcs=["region-info-deserializer-test.cc",], - deps=[":serde",],) + srcs=[ + "region-info-deserializer-test.cc", + ], + deps=[ + ":serde", + ],) diff --git a/hbase-native-client/third-party/BUCK b/hbase-native-client/third-party/BUCK index f37eb4e..a55a6fb 100644 --- a/hbase-native-client/third-party/BUCK +++ b/hbase-native-client/third-party/BUCK @@ -112,4 +112,6 @@ cxx_library( ('googletest/googlemock', 'src/*.cc'), ]), exported_deps=dynamic_rules, - visibility=['PUBLIC',],) + visibility=[ + 'PUBLIC', + ],) diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK index 796f2f5..683094f 100644 --- a/hbase-native-client/utils/BUCK +++ b/hbase-native-client/utils/BUCK @@ -17,13 +17,26 @@ cxx_library( name="utils", - exported_headers=["user-util.h", "version.h"], - srcs=["user-util.cc",], - deps=['//third-party:folly',], + exported_headers=[ + "user-util.h", "version.h", "connection-util.h", "sys-util.h", + "time-util.h" + ], + srcs=[ + "user-util.cc", + ], + deps=[ + '//third-party:folly', + ], tests=[":user-util-test"], - visibility=['PUBLIC',], + visibility=[ + 'PUBLIC', + ], compiler_flags=['-Weffc++'],) cxx_test( name="user-util-test", - srcs=["user-util-test.cc",], - deps=[":utils",],) + srcs=[ + "user-util-test.cc", + ], + deps=[ + ":utils", + ],) diff --git a/hbase-native-client/utils/connection-util.cc b/hbase-native-client/utils/connection-util.cc new file mode 100644 index 0000000..1451655 --- /dev/null +++ b/hbase-native-client/utils/connection-util.cc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection-util.h" + +namespace hbase { + +const std::vector ConnectionUtils::RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, + 100, 100, 100, 100, 200, 200}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h new file mode 100644 index 0000000..0e86ce4 --- /dev/null +++ b/hbase-native-client/utils/connection-util.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/hbase-rpc-controller.h" +#include "utils/time-util.h" + +namespace hbase { +class ConnectionUtils { + public: + static int Retries2Attempts(const int& retries) { + return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1); + } + + /* Add a delta to avoid timeout immediately after a retry sleeping. */ + static const long SLEEP_DELTA_NS = 1000000; + + static const std::vector RETRY_BACKOFF; + /** + * Calculate pause time. Built on {@link RETRY_BACKOFF}. + * @param pause time to pause + * @param tries amount of tries + * @return How long to wait after tries retries + */ + static long GetPauseTime(const long& pause, const int& tries) { + int ntries = tries; + if (ntries >= RETRY_BACKOFF.size()) { + ntries = RETRY_BACKOFF.size() - 1; + } + if (ntries < 0) { + ntries = 0; + } + + long normal_pause = pause * RETRY_BACKOFF[ntries]; + // 1% possible jitter + float r = static_cast(std::rand()) / static_cast(RAND_MAX); + long jitter = (long)(normal_pause * r * 0.01f); + return normal_pause + jitter; + } + + static void ResetController(std::shared_ptr controller, + const long& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns))); + } + } +}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h new file mode 100644 index 0000000..68f00d7 --- /dev/null +++ b/hbase-native-client/utils/sys-util.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include + +namespace hbase { + +class SysUtil { + public: + template + static constexpr bool InstanceOf(const DERIVED& object) { + return !dynamic_cast(&object); + } + + template + static constexpr bool InstanceOf() { + return std::is_base_of(); + } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h new file mode 100644 index 0000000..b466edb --- /dev/null +++ b/hbase-native-client/utils/time-util.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class TimeUtil { + public: + static long ToMillis(const long& nanos) { + std::chrono::duration_cast(nanoseconds(nanos)).count(); + } + + static std::string ToMillisStr(const long& nanos) { + return std::to_string(std::chrono::duration_cast(nanoseconds(nanos)).count()); + } + + static long GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + static long ElapsedMillis(const long& start_ns) { + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns)).count(); + } + + static std::string ElapsedMillisStr(const long& start_ns) { + return std::to_string( + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns)).count()); + } +}; +} /* namespace hbase */ -- 2.10.1 (Apple Git-78)