diff --git hbase-native-client/Makefile hbase-native-client/Makefile index 84ae556..b926220 100644 --- hbase-native-client/Makefile +++ hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils security +MODULES = connection core serde test-util utils security exceptions SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git hbase-native-client/bin/start-docker.sh hbase-native-client/bin/start-docker.sh index 1380cdf..8b017a0 100755 --- hbase-native-client/bin/start-docker.sh +++ hbase-native-client/bin/start-docker.sh @@ -56,7 +56,7 @@ docker build -t hbase_native . # After the image is built run the thing docker run -p 16050:16050/tcp \ - -v ${BASE_DIR}/..:/usr/src/hbase \ + -v ${BASE_DIR}/..:/usr/src/hbase \ -v ~/.m2:/root/.m2 \ -it hbase_native /bin/bash popd diff --git hbase-native-client/connection/connection-factory.cc hbase-native-client/connection/connection-factory.cc index 2f7e75c..832b00f 100644 --- hbase-native-client/connection/connection-factory.cc +++ hbase-native-client/connection/connection-factory.cc @@ -31,11 +31,10 @@ using std::chrono::milliseconds; using std::chrono::nanoseconds; ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool, - std::shared_ptr codec, - nanoseconds connect_timeout) + std::shared_ptr codec, nanoseconds connect_timeout) : connect_timeout_(connect_timeout), - io_pool_(io_pool), - pipeline_factory_(std::make_shared(codec)) {} + io_pool_(io_pool), + pipeline_factory_(std::make_shared(codec)) {} std::shared_ptr> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared>(); diff --git hbase-native-client/connection/connection-factory.h hbase-native-client/connection/connection-factory.h index fbcb6ef..32d0bf7 100644 --- hbase-native-client/connection/connection-factory.h +++ hbase-native-client/connection/connection-factory.h @@ -44,8 +44,7 @@ class ConnectionFactory { * There should only be one ConnectionFactory per client. */ ConnectionFactory(std::shared_ptr io_pool, - std::shared_ptr codec, - nanoseconds connect_timeout = nanoseconds(0)); + std::shared_ptr codec, nanoseconds connect_timeout = nanoseconds(0)); /** Default Destructor */ virtual ~ConnectionFactory() = default; diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index b18ee89..4fe4610 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -34,8 +35,7 @@ using folly::SharedMutexWritePriority; using folly::SocketAddress; ConnectionPool::ConnectionPool(std::shared_ptr io_executor, - std::shared_ptr codec, - nanoseconds connect_timeout) + std::shared_ptr codec, nanoseconds connect_timeout) : cf_(std::make_shared(io_executor, codec, connect_timeout)), clients_(), connections_(), diff --git hbase-native-client/connection/rpc-client.cc hbase-native-client/connection/rpc-client.cc index c61a73e..5fa1138 100644 --- hbase-native-client/connection/rpc-client.cc +++ hbase-native-client/connection/rpc-client.cc @@ -18,27 +18,17 @@ */ #include "connection/rpc-client.h" + +#include #include #include +#include +#include using hbase::RpcClient; -using hbase::AbstractRpcChannel; namespace hbase { -class RpcChannelImplementation : public AbstractRpcChannel { - public: - RpcChannelImplementation(std::shared_ptr rpc_client, const std::string& host, - uint16_t port, std::shared_ptr ticket, int rpc_timeout) - : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {} - - void CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* request, - Message* response, Closure* done) override { - rpc_client_->CallMethod(method, controller, request, response, done, host_, port_, ticket_); - } -}; -} // namespace hbase - RpcClient::RpcClient(std::shared_ptr io_executor, std::shared_ptr codec, nanoseconds connect_timeout) : io_executor_(io_executor) { @@ -80,26 +70,4 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { return cp_->GetConnection(remote_id); } - -std::shared_ptr RpcClient::CreateRpcChannel(const std::string& host, uint16_t port, - std::shared_ptr ticket, - int rpc_timeout) { - std::shared_ptr channel = std::make_shared( - shared_from_this(), host, port, ticket, rpc_timeout); - - /* static_pointer_cast is safe since RpcChannelImplementation derives - * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */ - return std::static_pointer_cast(channel); -} - -void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* controller, - const Message* req_msg, Message* resp_msg, Closure* done, - const std::string& host, uint16_t port, std::shared_ptr ticket) { - std::shared_ptr shared_req(const_cast(req_msg)); - std::shared_ptr shared_resp(resp_msg); - - std::unique_ptr req = std::make_unique(shared_req, shared_resp, method->name()); - - AsyncCall(host, port, std::move(req), ticket, method->service()->name()) - .then([done, this](std::unique_ptr resp) { done->Run(); }); -} +} // namespace hbase diff --git hbase-native-client/connection/rpc-client.h hbase-native-client/connection/rpc-client.h index 5c11ab5..d416ceb 100644 --- hbase-native-client/connection/rpc-client.h +++ hbase-native-client/connection/rpc-client.h @@ -38,24 +38,15 @@ using hbase::ConnectionPool; using hbase::RpcConnection; using hbase::security::User; -using google::protobuf::MethodDescriptor; -using google::protobuf::RpcChannel; using google::protobuf::Message; -using google::protobuf::RpcController; -using google::protobuf::Closure; - using std::chrono::nanoseconds; -class RpcChannelImplementation; - namespace hbase { -class RpcClient : public std::enable_shared_from_this { - friend class RpcChannelImplementation; - +class RpcClient { public: - RpcClient(std::shared_ptr io_executor, - std::shared_ptr codec, nanoseconds connect_timeout); + RpcClient(std::shared_ptr io_executor, std::shared_ptr codec, + nanoseconds connect_timeout = nanoseconds(0)); virtual ~RpcClient() { Close(); } @@ -79,40 +70,13 @@ class RpcClient : public std::enable_shared_from_this { virtual void Close(); - virtual std::shared_ptr CreateRpcChannel(const std::string &host, uint16_t port, - std::shared_ptr ticket, - int rpc_timeout); - std::shared_ptr connection_pool() const { return cp_; } private: - void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg, - Message *resp_msg, Closure *done, const std::string &host, uint16_t port, - std::shared_ptr ticket); std::shared_ptr GetConnection(std::shared_ptr remote_id); private: std::shared_ptr cp_; std::shared_ptr io_executor_; }; - -class AbstractRpcChannel : public RpcChannel { - public: - AbstractRpcChannel(std::shared_ptr rpc_client, const std::string &host, uint16_t port, - std::shared_ptr ticket, int rpc_timeout) - : rpc_client_(rpc_client), - host_(host), - port_(port), - ticket_(ticket), - rpc_timeout_(rpc_timeout) {} - - virtual ~AbstractRpcChannel() = default; - - protected: - std::shared_ptr rpc_client_; - std::string host_; - uint16_t port_; - std::shared_ptr ticket_; - int rpc_timeout_; -}; } // namespace hbase diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index e541d8f..2f4f6c1 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -40,6 +40,9 @@ cxx_library( "request_converter.h", "response_converter.h", "table.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", + "hbase-rpc-controller.h", ], srcs=[ "cell.cc", @@ -58,6 +61,8 @@ cxx_library( "table.cc", ], deps=[ + "//exceptions:exceptions", + "//utils:utils", "//connection:connection", "//if:if", "//serde:serde", @@ -96,6 +101,15 @@ cxx_test( deps=[":core",], run_test_separately=True,) cxx_test( + name="retry-test", + srcs=["async-rpc-retrying-test.cc",], + deps=[ + ":core", + "//test-util:test-util", + "//exceptions:exceptions", + ], + run_test_separately=True,) +cxx_test( name="time_range-test", srcs=["time_range-test.cc",], deps=[":core",], diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.cc hbase-native-client/core/async-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..0ac9cac --- /dev/null +++ hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-rpc-retrying-caller-factory.h" + +namespace hbase {} // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..3342e29 --- /dev/null +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-rpc-retrying-caller.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +using hbase::pb::TableName; +using std::chrono::nanoseconds; + +namespace hbase { + +template +class SingleRequestCallerBuilder + : public std::enable_shared_from_this> { + public: + explicit SingleRequestCallerBuilder(std::shared_ptr conn) + : conn_(conn), + table_name_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr table(std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr row(const std::string& row) { + row_ = row; + return shared_this(); + } + + SharedThisPtr locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable callable) { + callable_ = callable; + return shared_this(); + } + + folly::Future Call() { return Build()->Call(); } + + std::shared_ptr> Build() { + return std::make_shared>( + conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(), + conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_, + conn_->get_conn_conf()->GetStartLogErrorsCount()); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr table_name_; + nanoseconds rpc_timeout_nanos_; + nanoseconds operation_timeout_nanos_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; +}; // end of SingleRequestCallerBuilder + +template +class AsyncRpcRetryingCallerFactory { + private: + std::shared_ptr conn_; + + public: + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template + std::shared_ptr> Single() { + return std::make_shared>(conn_); + } +}; + +} // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller.cc hbase-native-client/core/async-rpc-retrying-caller.cc new file mode 100644 index 0000000..743b6bb --- /dev/null +++ hbase-native-client/core/async-rpc-retrying-caller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-rpc-retrying-caller.h" + +namespace hbase {} /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-caller.h hbase-native-client/core/async-rpc-retrying-caller.h new file mode 100644 index 0000000..f7a1523 --- /dev/null +++ hbase-native-client/core/async-rpc-retrying-caller.h @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "connection/rpc-client.h" +#include "core/hbase-rpc-controller.h" +#include "core/region-location.h" +#include "exceptions/exception.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +template +using Supplier = std::function; + +template +using Consumer = std::function; + +template +using ReqConverter = std::function; + +template +using RespConverter = std::function; + +template +using RpcCallback = std::function; + +template +using RpcCall = std::function>( + std::shared_ptr, std::shared_ptr, + std::shared_ptr, std::unique_ptr)>; + +template +using Callable = std::function(std::shared_ptr, + std::shared_ptr, + std::shared_ptr)>; + +template +class AsyncSingleRequestRpcRetryingCaller { + public: + AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr table_name, + const std::string& row, RegionLocateType locate_type, + Callable callable, nanoseconds pause_ns, + uint32_t max_retries, nanoseconds operation_timeout_nanos, + nanoseconds rpc_timeout_nanos, + uint32_t start_log_errors_count) + : conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + controller_ = conn_->get_rpc_controller_factory()->NewController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared>(); + retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); + } + + virtual ~AsyncSingleRequestRpcRetryingCaller() {} + + folly::Future Call() { + auto f = promise_->getFuture(); + LocateThenCall(); + return f; + } + + private: + void LocateThenCall() { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->get_locator() + ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](RegionLocation& loc) { Call(loc); }) + .onError([this](const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + + table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [](const std::exception& error) {}); + }); + } + + void OnError(const std::exception& error, Supplier err_msg, + Consumer update_cached_location) { + ThrowableWithExtraContext twec(std::make_shared(error), + TimeUtil::GetNowNanos()); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf(error) || + tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + int64_t delay_ns; + if (operation_timeout_nanos_.count() > 0) { + int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = + std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); + } + + void Call(const RegionLocation& loc) { + int64_t call_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); + } else { + call_timeout_ns = rpc_timeout_nanos_.count(); + } + + std::shared_ptr rpc_client; + try { + rpc_client = conn_->GetRpcClient(); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async rpc_client to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error); + }); + return; + } + + ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared(loc), rpc_client) + .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) + .onError([&, this](const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error); + }); + return; + }); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + int64_t RemainingTimeNs() { + return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); + } + + static void ResetController(std::shared_ptr controller, + const int64_t& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + milliseconds(std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); + } + } + + private: + folly::HHWheelTimer::UniquePtr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; + nanoseconds pause_ns_; + uint32_t max_retries_; + nanoseconds operation_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + uint64_t start_ns_; + uint32_t tries_; + std::shared_ptr> exceptions_; + uint32_t max_attempts_; + folly::EventBase event_base_; +}; + +} /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc new file mode 100644 index 0000000..a9b0017 --- /dev/null +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "connection/request.h" +#include "connection/response.h" +#include "connection/rpc-client.h" +#include "core/async-rpc-retrying-caller-factory.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/client.h" +#include "core/hbase-rpc-controller.h" +#include "core/keyvalue-codec.h" +#include "core/region-location.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/result.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "test-util/test-util.h" + +using namespace google::protobuf; +using namespace hbase; +using namespace hbase::pb; +using namespace std::placeholders; +using namespace testing; +using ::testing::Return; +using ::testing::_; +using std::chrono::nanoseconds; + +class MockRpcControllerFactory { + public: + MOCK_METHOD0(NewController, std::shared_ptr()); +}; + +class MockAsyncConnectionConfiguration { + public: + MOCK_METHOD0(GetPauseNs, nanoseconds()); + MOCK_METHOD0(GetMaxRetries, int32_t()); + MOCK_METHOD0(GetStartLogErrorsCount, int32_t()); + MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds()); + MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds()); +}; + +class AsyncRegionLocator { + public: + explicit AsyncRegionLocator(std::shared_ptr region_location) + : region_location_(region_location) {} + ~AsyncRegionLocator() = default; + + folly::Future GetRegionLocation(std::shared_ptr, + const std::string&, RegionLocateType, int64_t) { + folly::Promise promise; + promise.setValue(*region_location_); + return promise.getFuture(); + } + + void UpdateCachedLocation(const RegionLocation&, const std::exception&) {} + + private: + std::shared_ptr region_location_; +}; + +class MockAsyncConnection { + public: + MOCK_METHOD0(get_conn_conf, std::shared_ptr()); + MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr()); + MOCK_METHOD0(get_locator, std::shared_ptr()); + MOCK_METHOD0(GetRpcClient, std::shared_ptr()); +}; + +template +class MockRawAsyncTableImpl { + public: + explicit MockRawAsyncTableImpl(std::shared_ptr conn) + : conn_(conn), promise_(std::make_shared>()) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + + /* in real RawAsyncTableImpl, this should be private. */ + folly::Future GetCall(std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc, const hbase::Get& get) { + hbase::RpcCall rpc_call = []( + std::shared_ptr rpc_client, std::shared_ptr loc, + std::shared_ptr controller, + std::unique_ptr preq) -> folly::Future> { + return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(preq), User::defaultUser(), "ClientService"); + }; + + return Call( + rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, + &hbase::ResponseConverter::FromGetResponse); + } + + /* in real RawAsyncTableImpl, this should be private. */ + template + folly::Future Call( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + const ReqConverter, REQ, std::string>& req_converter, + const hbase::RpcCall& rpc_call, + const RespConverter, PRESP>& resp_converter) { + rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) + .then([&, this](std::unique_ptr presp) { + std::unique_ptr result = hbase::ResponseConverter::FromGetResponse(*presp); + promise_->setValue(std::move(*result)); + }) + .onError([this](const std::exception& e) { promise_->setException(e); }); + return promise_->getFuture(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr> promise_; +}; + +TEST(AsyncRpcRetryTest, TestGetBasic) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + + // Using TestUtil to populate test data + hbase::TestUtil* test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::Configuration conf; + + // Create a client + Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + /* init region location and rpc channel */ + auto region_location = table->GetRegionLocation(row); + + auto io_executor_ = std::make_shared(1); + auto codec = std::make_shared(); + auto rpc_client = std::make_shared(io_executor_, codec); + + /* init rpc controller */ + auto controller = std::make_shared(); + + /* init rpc controller factory */ + auto controller_factory = std::make_shared(); + EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller)); + + /* init connection configuration */ + auto connection_conf = std::make_shared(); + EXPECT_CALL((*connection_conf), GetPauseNs()) + .Times(1) + .WillRepeatedly(Return(nanoseconds(100000000))); + EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31)); + EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9)); + EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs()) + .Times(1) + .WillRepeatedly(Return(nanoseconds(60000000000))); + EXPECT_CALL((*connection_conf), GetOperationTimeoutNs()) + .Times(1) + .WillRepeatedly(Return(nanoseconds(1200000000000))); + + /* init region locator */ + auto region_locator = std::make_shared(region_location); + + /* init hbase client connection */ + auto conn = std::make_shared(); + EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf)); + EXPECT_CALL((*conn), get_rpc_controller_factory()) + .Times(AtLeast(1)) + .WillRepeatedly(Return(controller_factory)); + EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator)); + EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client)); + + /* init retry caller factory */ + auto tableImpl = std::make_shared>(conn); + AsyncRpcRetryingCallerFactory caller_factory(conn); + + /* init request caller builder */ + auto builder = caller_factory.Single(); + + /* call with retry to get result */ + try { + auto async_caller = + builder->table(std::make_shared(tn)) + ->row(row) + ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) + ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) + ->action( + [=, &get]( + std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) + ->Build(); + + hbase::Result result = async_caller->Call().get(); + + /*Stopping the connection as we are getting segfault due to some folly issue + The connection stays open and we don't want that. + So we are stopping the connection. + We can remove this once we have fixed the folly part */ + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result.Row()); + EXPECT_EQ("value2", *(result.Value("d", "2"))); + EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + throw e; + } + + table->Close(); + client.Close(); +} diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index 240da72..f0483ef 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -57,7 +57,8 @@ void Client::init(const hbase::Configuration &conf) { } else { LOG(WARNING) << "Not using RPC Cell Codec"; } - rpc_client_ = std::make_shared(io_executor_, codec, conn_conf_->connect_timeout()); + rpc_client_ = + std::make_shared(io_executor_, codec, conn_conf_->connect_timeout()); location_cache_ = std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); } diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index a96d6f3..e73ab70 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -89,7 +89,7 @@ class Client { bool is_closed_ = false; /** Methods */ - void init(const hbase::Configuration &conf); + void init(const hbase::Configuration& conf); }; } // namespace hbase diff --git hbase-native-client/core/filter.h hbase-native-client/core/filter.h index b5b7133..10accaa 100644 --- hbase-native-client/core/filter.h +++ hbase-native-client/core/filter.h @@ -20,9 +20,9 @@ #pragma once #include +#include #include #include -#include #include #include "if/Comparator.pb.h" diff --git hbase-native-client/core/hbase-rpc-controller.cc hbase-native-client/core/hbase-rpc-controller.cc new file mode 100644 index 0000000..bc53781 --- /dev/null +++ hbase-native-client/core/hbase-rpc-controller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/hbase-rpc-controller.h" + +namespace hbase {} /* namespace hbase */ diff --git hbase-native-client/core/hbase-rpc-controller.h hbase-native-client/core/hbase-rpc-controller.h new file mode 100644 index 0000000..661c810 --- /dev/null +++ hbase-native-client/core/hbase-rpc-controller.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include + +using google::protobuf::RpcController; +using google::protobuf::Closure; + +using std::chrono::milliseconds; + +namespace hbase { + +class HBaseRpcController : public RpcController { + public: + HBaseRpcController() {} + virtual ~HBaseRpcController() = default; + + void set_call_timeout(const milliseconds& call_timeout) { + // TODO: + } + + void Reset() override {} + + bool Failed() const override { return false; } + + std::string ErrorText() const override { return ""; } + + void StartCancel() override {} + + void SetFailed(const std::string& reason) override {} + + bool IsCanceled() const override { return false; } + + void NotifyOnCancel(Closure* callback) override {} +}; + +} /* namespace hbase */ diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index da9f64a..17032fe 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -25,6 +25,7 @@ #include +#include #include "connection/response.h" #include "connection/rpc-connection.h" #include "if/Client.pb.h" diff --git hbase-native-client/core/region-location.h hbase-native-client/core/region-location.h index b0411cb..4087d94 100644 --- hbase-native-client/core/region-location.h +++ hbase-native-client/core/region-location.h @@ -26,6 +26,8 @@ namespace hbase { +enum RegionLocateType { kBefore, kCurrent, kAfter }; + /** * @brief class to hold where a region is located. * @@ -49,17 +51,17 @@ class RegionLocation { /** * Get a reference to the regio info */ - const hbase::pb::RegionInfo ®ion_info() { return ri_; } + const hbase::pb::RegionInfo ®ion_info() const { return ri_; } /** * Get a reference to the server name */ - const hbase::pb::ServerName &server_name() { return sn_; } + const hbase::pb::ServerName &server_name() const { return sn_; } /** * Get a reference to the region name. */ - const std::string ®ion_name() { return region_name_; } + const std::string ®ion_name() const { return region_name_; } /** * Get a service. This could be closed or null. It's the caller's @@ -79,7 +81,7 @@ class RegionLocation { */ void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; } - const std::string DebugString() { + const std::string DebugString() const { return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString(); } diff --git hbase-native-client/core/response_converter.cc hbase-native-client/core/response_converter.cc index 19a3554..2497306 100644 --- hbase-native-client/core/response_converter.cc +++ hbase-native-client/core/response_converter.cc @@ -19,6 +19,7 @@ #include "core/response_converter.h" +#include #include #include "core/cell.h" diff --git hbase-native-client/core/response_converter.h hbase-native-client/core/response_converter.h index 859644b..759b1ce 100644 --- hbase-native-client/core/response_converter.h +++ hbase-native-client/core/response_converter.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include "connection/response.h" #include "core/result.h" #include "if/Client.pb.h" diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index 4e30d4b..ba4dc29 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -71,4 +71,8 @@ void Table::Close() { is_closed_ = true; } +std::shared_ptr Table::GetRegionLocation(const std::string &row) { + return location_cache_->LocateRegion(*table_name_, row).get(); +} + } /* namespace hbase */ diff --git hbase-native-client/core/table.h hbase-native-client/core/table.h index 0e98cd2..f82382e 100644 --- hbase-native-client/core/table.h +++ hbase-native-client/core/table.h @@ -57,6 +57,11 @@ class Table { */ void Close(); + /** + * @brief - Get region location for a row in current table. + */ + std::shared_ptr GetRegionLocation(const std::string &row); + private: std::shared_ptr table_name_; std::shared_ptr location_cache_; diff --git hbase-native-client/exceptions/BUCK hbase-native-client/exceptions/BUCK new file mode 100644 index 0000000..a23654c --- /dev/null +++ hbase-native-client/exceptions/BUCK @@ -0,0 +1,24 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cxx_library( + name="exceptions", + exported_headers=["exception.h",], + srcs=[], + deps=["//third-party:folly",], + compiler_flags=['-Weffc++'], + visibility=['//core/...'],) \ No newline at end of file diff --git hbase-native-client/exceptions/exception.h hbase-native-client/exceptions/exception.h new file mode 100644 index 0000000..c0c4142 --- /dev/null +++ hbase-native-client/exceptions/exception.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +namespace hbase { + +class ThrowableWithExtraContext { +public: + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when) : + cause_(cause), when_(when), extras_("") { + } + + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when, const std::string& extras) : + cause_(cause), when_(when), extras_(extras) { + } + + std::string ToString() { + // TODO: + // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + return extras_ + ", " + cause_->what(); + } + + std::shared_ptr cause() { + return cause_; + } +private: + std::shared_ptr cause_; + long when_; + std::string extras_; +}; + +class IOException: public std::logic_error { +public: + IOException( + const std::string& what) : + logic_error(what), cause_(nullptr) {} + IOException( + const std::string& what, + std::shared_ptr cause) : + logic_error(what), cause_(cause) {} + virtual ~IOException() = default; + + std::shared_ptr cause() { + return cause_; + } +private: + const std::shared_ptr cause_; +}; + +class RetriesExhaustedException: public IOException { +public: + RetriesExhaustedException( + const int& num_retries, + std::shared_ptr> exceptions) : + IOException( + GetMessage(num_retries, exceptions), + exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + } + virtual ~RetriesExhaustedException() = default; + +private: + std::string GetMessage( + const int& num_retries, + std::shared_ptr> exceptions) { + std::string buffer("Failed after attempts="); + buffer.append(std::to_string(num_retries + 1)); + buffer.append(", exceptions:\n"); + for (auto it = exceptions->begin(); it != exceptions->end(); it++) { + buffer.append(it->ToString()); + buffer.append("\n"); + } + return buffer; + } +}; + +class HBaseIOException : public IOException { +}; + +class DoNotRetryIOException : public HBaseIOException { +}; +} // namespace hbase diff --git hbase-native-client/utils/BUCK hbase-native-client/utils/BUCK index 796f2f5..eae929e 100644 --- hbase-native-client/utils/BUCK +++ hbase-native-client/utils/BUCK @@ -17,8 +17,11 @@ cxx_library( name="utils", - exported_headers=["user-util.h", "version.h"], - srcs=["user-util.cc",], + exported_headers=[ + "user-util.h", "version.h", "connection-util.h", "sys-util.h", + "time-util.h" + ], + srcs=["user-util.cc", "connection-util.cc"], deps=['//third-party:folly',], tests=[":user-util-test"], visibility=['PUBLIC',], diff --git hbase-native-client/utils/connection-util.cc hbase-native-client/utils/connection-util.cc new file mode 100644 index 0000000..76689bf --- /dev/null +++ hbase-native-client/utils/connection-util.cc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "utils/connection-util.h" + +namespace hbase { + +const std::vector ConnectionUtils::kRetryBackoff = {1, 2, 3, 5, 10, 20, 40, + 100, 100, 100, 100, 200, 200}; +} /* namespace hbase */ diff --git hbase-native-client/utils/connection-util.h hbase-native-client/utils/connection-util.h new file mode 100644 index 0000000..f52c2f9 --- /dev/null +++ hbase-native-client/utils/connection-util.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "utils/time-util.h" + +namespace hbase { +class ConnectionUtils { + public: + static int Retries2Attempts(const int& retries) { + return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1); + } + + /* Add a delta to avoid timeout immediately after a retry sleeping. */ + static const uint64_t kSleepDeltaNs = 1000000; + + static const std::vector kRetryBackoff; + /** + * Calculate pause time. Built on {@link kRetryBackoff}. + * @param pause time to pause + * @param tries amount of tries + * @return How long to wait after tries retries + */ + static int64_t GetPauseTime(const int64_t& pause, const int32_t& tries) { + int32_t ntries = tries; + if (static_cast(ntries) >= kRetryBackoff.size()) { + ntries = kRetryBackoff.size() - 1; + } + if (ntries < 0) { + ntries = 0; + } + + int64_t normal_pause = pause * kRetryBackoff[ntries]; + // 1% possible jitter + float r = static_cast(std::rand()) / static_cast(RAND_MAX); + int64_t jitter = (int64_t)(normal_pause * r * 0.01f); + return normal_pause + jitter; + } +}; +} /* namespace hbase */ diff --git hbase-native-client/utils/sys-util.h hbase-native-client/utils/sys-util.h new file mode 100644 index 0000000..68f00d7 --- /dev/null +++ hbase-native-client/utils/sys-util.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include + +namespace hbase { + +class SysUtil { + public: + template + static constexpr bool InstanceOf(const DERIVED& object) { + return !dynamic_cast(&object); + } + + template + static constexpr bool InstanceOf() { + return std::is_base_of(); + } +}; + +} /* namespace hbase */ diff --git hbase-native-client/utils/time-util.h hbase-native-client/utils/time-util.h new file mode 100644 index 0000000..bbc3b35 --- /dev/null +++ hbase-native-client/utils/time-util.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class TimeUtil { + public: + static int64_t ToMillis(const int64_t& nanos) { + return std::chrono::duration_cast(nanoseconds(nanos)).count(); + } + + static std::string ToMillisStr(const nanoseconds& nanos) { + return std::to_string(std::chrono::duration_cast(nanos).count()); + } + + static int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + static int64_t ElapsedMillis(const int64_t& start_ns) { + return std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns)).count(); + } + + static std::string ElapsedMillisStr(const int64_t& start_ns) { + return std::to_string( + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns)).count()); + } +}; +} /* namespace hbase */