diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index 13e8361..43fb91b 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -19,6 +19,8 @@ cxx_library( name="core", exported_headers=[ + "async-connection.h", + "async-region-locator.h", "client.h", "cell.h", "hbase-macros.h", @@ -40,12 +42,14 @@ cxx_library( "request-converter.h", "response-converter.h", "table.h", + "raw-async-table.h", "async-rpc-retrying-caller-factory.h", "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", "zk-util.h", ], srcs=[ + "async-connection.cc", "cell.cc", "client.cc", "keyvalue-codec.cc", @@ -56,6 +60,7 @@ cxx_library( "configuration.cc", "hbase-configuration-loader.cc", "scan.cc", + "raw-async-table.cc", "result.cc", "request-converter.cc", "response-converter.cc", @@ -103,15 +108,6 @@ cxx_test( deps=[":core",], run_test_separately=True,) cxx_test( - name="retry-test", - srcs=["async-rpc-retrying-test.cc",], - deps=[ - ":core", - "//test-util:test-util", - "//exceptions:exceptions", - ], - run_test_separately=True,) -cxx_test( name="time-range-test", srcs=["time-range-test.cc",], deps=[":core",], diff --git hbase-native-client/core/async-connection.cc hbase-native-client/core/async-connection.cc new file mode 100644 index 0000000..b945e38 --- /dev/null +++ hbase-native-client/core/async-connection.cc @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-connection.h" +#include "core/async-rpc-retrying-caller-factory.h" + +namespace hbase { + +void AsyncConnectionImpl::Init() { + connection_conf_ = std::make_shared(*conf_); + // start thread pools + auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN)); + auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); + cpu_executor_ = std::make_shared(cpu_threads); + io_executor_ = std::make_shared(io_threads); + + std::shared_ptr codec = nullptr; + if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == + std::string(KeyValueCodec::kJavaClassName)) { + codec = std::make_shared(); + } else { + LOG(WARNING) << "Not using RPC Cell Codec"; + } + rpc_client_ = + std::make_shared(io_executor_, codec, connection_conf_->connect_timeout()); + location_cache_ = + std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); + caller_factory_ = std::make_shared(shared_from_this()); +} + +// We can't have the threads continue running after everything is done +// that leads to an error. +AsyncConnectionImpl::~AsyncConnectionImpl() { + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); +} + +void AsyncConnectionImpl::Close() { + if (is_closed_) return; + + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; +} + +} // namespace hbase diff --git hbase-native-client/core/async-connection.h hbase-native-client/core/async-connection.h new file mode 100644 index 0000000..7ec871c --- /dev/null +++ hbase-native-client/core/async-connection.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-region-locator.h" +#include "core/configuration.h" +#include "core/connection-configuration.h" +#include "core/connection-configuration.h" +#include "core/hbase-configuration-loader.h" +#include "core/hbase-rpc-controller.h" +#include "core/keyvalue-codec.h" +#include "core/location-cache.h" +#include "if/Cell.pb.h" +#include "serde/table-name.h" + +namespace hbase { + +class AsyncRpcRetryingCallerFactory; + +class AsyncConnection { + public: + virtual ~AsyncConnection() = default; + virtual std::shared_ptr conf() = 0; + virtual std::shared_ptr connection_conf() = 0; + virtual std::shared_ptr caller_factory() = 0; + virtual std::shared_ptr rpc_client() = 0; + virtual std::shared_ptr region_locator() = 0; + virtual std::shared_ptr CreateRpcController() = 0; + virtual void Close() = 0; +}; + +class AsyncConnectionImpl : public AsyncConnection, public std::enable_shared_from_this { + public: + virtual ~AsyncConnectionImpl(); + + // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/ + template + static std::shared_ptr Create( T&& ... all ) { + auto conn = std::shared_ptr( new AsyncConnectionImpl( std::forward(all)... ) ); + conn->Init(); + return conn; + } + + + std::shared_ptr conf() override { return conf_; } + std::shared_ptr connection_conf() override { return connection_conf_; } + std::shared_ptr caller_factory() override { + return caller_factory_; + } + std::shared_ptr rpc_client() override { return rpc_client_; } + std::shared_ptr location_cache() { return location_cache_; } + std::shared_ptr region_locator() override { return location_cache_; } + std::shared_ptr CreateRpcController() override { + return std::make_shared(); + } + virtual void Close() override; + + private: + /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ + static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; + /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ + static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; + /** The RPC codec to encode cells. For now it is KeyValueCodec */ + static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; + + std::shared_ptr conf_; + std::shared_ptr connection_conf_; + std::shared_ptr caller_factory_; + std::shared_ptr cpu_executor_; + std::shared_ptr io_executor_; + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_; + bool is_closed_ = false; + + private: + AsyncConnectionImpl(std::shared_ptr conf): conf_(conf) { } + void Init(); +}; +} // namespace hbase diff --git hbase-native-client/core/async-region-locator.h hbase-native-client/core/async-region-locator.h new file mode 100644 index 0000000..b0019e0 --- /dev/null +++ hbase-native-client/core/async-region-locator.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +#include "core/region-location.h" +#include "if/Client.pb.h" +#include "serde/region-info.h" +#include "serde/server-name.h" +#include "serde/table-name.h" + +namespace hbase { + +class AsyncRegionLocator { + public: + AsyncRegionLocator() {} + virtual ~AsyncRegionLocator() = default; + + /** + * The only method clients should use for meta lookups. If corresponding + * location is cached, it's returned from the cache, otherwise lookup + * in meta table is done, location is cached and then returned. + * It's expected that tiny fraction of invocations incurs meta scan. + * This method is to look up non-meta regions; use LocateMeta() to get the + * location of hbase:meta region. + * + * @param tn Table name of the table to look up. This object must live until + * after the future is returned + * + * @param row of the table to look up. This object must live until after the + * future is returned + */ + virtual folly::Future> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) = 0; + /** + * Update cached region location, possibly using the information from exception. + */ + virtual void UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) = 0; +}; + +} // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.cc hbase-native-client/core/async-rpc-retrying-caller-factory.cc index 0ac9cac..429daa6 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.cc +++ hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -19,4 +19,6 @@ #include "core/async-rpc-retrying-caller-factory.h" -namespace hbase {} // namespace hbase +namespace hbase { + +} // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h index 3342e29..5bcad6c 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -35,20 +35,25 @@ using std::chrono::nanoseconds; namespace hbase { -template +class AsyncConnection; + +template class SingleRequestCallerBuilder - : public std::enable_shared_from_this> { + : public std::enable_shared_from_this> { public: - explicit SingleRequestCallerBuilder(std::shared_ptr conn) + explicit SingleRequestCallerBuilder(std::shared_ptr conn) : conn_(conn), table_name_(nullptr), - rpc_timeout_nanos_(0), - operation_timeout_nanos_(0), + rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), + pause_(conn->connection_conf()->pause()), + operation_timeout_nanos_(conn->connection_conf()->operation_timeout()), + max_retries_(conn->connection_conf()->max_retries()), + start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), locate_type_(RegionLocateType::kCurrent) {} virtual ~SingleRequestCallerBuilder() = default; - typedef SingleRequestCallerBuilder GenenericThisType; + typedef SingleRequestCallerBuilder GenenericThisType; typedef std::shared_ptr SharedThisPtr; SharedThisPtr table(std::shared_ptr table_name) { @@ -66,6 +71,21 @@ class SingleRequestCallerBuilder return shared_this(); } + SharedThisPtr pause(nanoseconds pause) { + pause_ = pause; + return shared_this(); + } + + SharedThisPtr max_retries(uint32_t max_retries) { + max_retries_ = max_retries; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + SharedThisPtr row(const std::string& row) { row_ = row; return shared_this(); @@ -76,18 +96,17 @@ class SingleRequestCallerBuilder return shared_this(); } - SharedThisPtr action(Callable callable) { + SharedThisPtr action(Callable callable) { callable_ = callable; return shared_this(); } folly::Future Call() { return Build()->Call(); } - std::shared_ptr> Build() { - return std::make_shared>( - conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(), - conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_, - conn_->get_conn_conf()->GetStartLogErrorsCount()); + std::shared_ptr> Build() { + return std::make_shared>( + conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, + operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); } private: @@ -96,28 +115,30 @@ class SingleRequestCallerBuilder } private: - std::shared_ptr conn_; + std::shared_ptr conn_; std::shared_ptr table_name_; nanoseconds rpc_timeout_nanos_; nanoseconds operation_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; std::string row_; RegionLocateType locate_type_; - Callable callable_; + Callable callable_; }; // end of SingleRequestCallerBuilder -template class AsyncRpcRetryingCallerFactory { private: - std::shared_ptr conn_; + std::shared_ptr conn_; public: - explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} virtual ~AsyncRpcRetryingCallerFactory() = default; - template - std::shared_ptr> Single() { - return std::make_shared>(conn_); + template + std::shared_ptr> Single() { + return std::make_shared>(conn_); } }; diff --git hbase-native-client/core/async-rpc-retrying-caller.h hbase-native-client/core/async-rpc-retrying-caller.h index f7a1523..6503301 100644 --- hbase-native-client/core/async-rpc-retrying-caller.h +++ hbase-native-client/core/async-rpc-retrying-caller.h @@ -32,6 +32,7 @@ #include #include #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/hbase-rpc-controller.h" #include "core/region-location.h" #include "exceptions/exception.h" @@ -60,23 +61,23 @@ using RespConverter = std::function; template using RpcCallback = std::function; -template +template using RpcCall = std::function>( - std::shared_ptr, std::shared_ptr, + std::shared_ptr, std::shared_ptr, std::shared_ptr, std::unique_ptr)>; -template -using Callable = std::function(std::shared_ptr, - std::shared_ptr, - std::shared_ptr)>; +template +using Callable = + std::function(std::shared_ptr, + std::shared_ptr, std::shared_ptr)>; -template +template class AsyncSingleRequestRpcRetryingCaller { public: - AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, + AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, std::shared_ptr table_name, const std::string& row, RegionLocateType locate_type, - Callable callable, nanoseconds pause_ns, + Callable callable, nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) @@ -85,14 +86,14 @@ class AsyncSingleRequestRpcRetryingCaller { row_(row), locate_type_(locate_type), callable_(callable), - pause_ns_(pause_ns), + pause_(pause), max_retries_(max_retries), operation_timeout_nanos_(operation_timeout_nanos), rpc_timeout_nanos_(rpc_timeout_nanos), start_log_errors_count_(start_log_errors_count), promise_(std::make_shared>()), tries_(1) { - controller_ = conn_->get_rpc_controller_factory()->NewController(); + controller_ = conn_->CreateRpcController(); start_ns_ = TimeUtil::GetNowNanos(); max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); exceptions_ = std::make_shared>(); @@ -120,9 +121,9 @@ class AsyncSingleRequestRpcRetryingCaller { locate_timeout_ns = -1L; } - conn_->get_locator() - ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) - .then([this](RegionLocation& loc) { Call(loc); }) + conn_->region_locator() + ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](std::shared_ptr loc) { Call(*loc); }) .onError([this](const std::exception& e) { OnError(e, [this]() -> std::string { @@ -155,10 +156,9 @@ class AsyncSingleRequestRpcRetryingCaller { CompleteExceptionally(); return; } - delay_ns = - std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1)); + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1); + delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); } update_cached_location(error); tries_++; @@ -179,9 +179,10 @@ class AsyncSingleRequestRpcRetryingCaller { call_timeout_ns = rpc_timeout_nanos_.count(); } - std::shared_ptr rpc_client; + std::shared_ptr rpc_client; try { - rpc_client = conn_->GetRpcClient(); + // TODO: There is no connection attempt happening here, no need to try-catch. + rpc_client = conn_->rpc_client(); } catch (const IOException& e) { OnError(e, [&, this]() -> std::string { @@ -196,7 +197,7 @@ class AsyncSingleRequestRpcRetryingCaller { " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; }, [&, this](const std::exception& error) { - conn_->get_locator()->UpdateCachedLocation(loc, error); + conn_->region_locator()->UpdateCachedLocation(loc, error); }); return; } @@ -219,7 +220,7 @@ class AsyncSingleRequestRpcRetryingCaller { " ms"; }, [&, this](const std::exception& error) { - conn_->get_locator()->UpdateCachedLocation(loc, error); + conn_->region_locator()->UpdateCachedLocation(loc, error); }); return; }); @@ -244,12 +245,12 @@ class AsyncSingleRequestRpcRetryingCaller { private: folly::HHWheelTimer::UniquePtr retry_timer_; - std::shared_ptr conn_; + std::shared_ptr conn_; std::shared_ptr table_name_; std::string row_; RegionLocateType locate_type_; - Callable callable_; - nanoseconds pause_ns_; + Callable callable_; + nanoseconds pause_; uint32_t max_retries_; nanoseconds operation_timeout_nanos_; nanoseconds rpc_timeout_nanos_; diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc deleted file mode 100644 index 81d726f..0000000 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "connection/request.h" -#include "connection/response.h" -#include "connection/rpc-client.h" -#include "core/async-rpc-retrying-caller-factory.h" -#include "core/async-rpc-retrying-caller.h" -#include "core/client.h" -#include "core/hbase-rpc-controller.h" -#include "core/keyvalue-codec.h" -#include "core/region-location.h" -#include "core/request-converter.h" -#include "core/response-converter.h" -#include "core/result.h" -#include "exceptions/exception.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" -#include "test-util/test-util.h" - -using namespace google::protobuf; -using namespace hbase; -using namespace hbase::pb; -using namespace std::placeholders; -using namespace testing; -using ::testing::Return; -using ::testing::_; -using std::chrono::nanoseconds; - -class MockRpcControllerFactory { - public: - MOCK_METHOD0(NewController, std::shared_ptr()); -}; - -class MockAsyncConnectionConfiguration { - public: - MOCK_METHOD0(GetPauseNs, nanoseconds()); - MOCK_METHOD0(GetMaxRetries, int32_t()); - MOCK_METHOD0(GetStartLogErrorsCount, int32_t()); - MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds()); - MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds()); -}; - -class AsyncRegionLocator { - public: - explicit AsyncRegionLocator(std::shared_ptr region_location) - : region_location_(region_location) {} - ~AsyncRegionLocator() = default; - - folly::Future GetRegionLocation(std::shared_ptr, - const std::string&, RegionLocateType, int64_t) { - folly::Promise promise; - promise.setValue(*region_location_); - return promise.getFuture(); - } - - void UpdateCachedLocation(const RegionLocation&, const std::exception&) {} - - private: - std::shared_ptr region_location_; -}; - -class MockAsyncConnection { - public: - MOCK_METHOD0(get_conn_conf, std::shared_ptr()); - MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr()); - MOCK_METHOD0(get_locator, std::shared_ptr()); - MOCK_METHOD0(GetRpcClient, std::shared_ptr()); -}; - -template -class MockRawAsyncTableImpl { - public: - explicit MockRawAsyncTableImpl(std::shared_ptr conn) - : conn_(conn), promise_(std::make_shared>()) {} - virtual ~MockRawAsyncTableImpl() = default; - - /* implement this in real RawAsyncTableImpl. */ - - /* in real RawAsyncTableImpl, this should be private. */ - folly::Future GetCall(std::shared_ptr rpc_client, - std::shared_ptr controller, - std::shared_ptr loc, const hbase::Get& get) { - hbase::RpcCall rpc_call = []( - std::shared_ptr rpc_client, std::shared_ptr loc, - std::shared_ptr controller, - std::unique_ptr preq) -> folly::Future> { - return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), - std::move(preq), User::defaultUser(), "ClientService"); - }; - - return Call( - rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, - &hbase::ResponseConverter::FromGetResponse); - } - - /* in real RawAsyncTableImpl, this should be private. */ - template - folly::Future Call( - std::shared_ptr rpc_client, std::shared_ptr controller, - std::shared_ptr loc, const REQ& req, - const ReqConverter, REQ, std::string>& req_converter, - const hbase::RpcCall& rpc_call, - const RespConverter, PRESP>& resp_converter) { - rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) - .then([&, this](std::unique_ptr presp) { - std::unique_ptr result = hbase::ResponseConverter::FromGetResponse(*presp); - promise_->setValue(std::move(*result)); - }) - .onError([this](const std::exception& e) { promise_->setException(e); }); - return promise_->getFuture(); - } - - private: - std::shared_ptr conn_; - std::shared_ptr> promise_; -}; - -TEST(AsyncRpcRetryTest, TestGetBasic) { - // Using TestUtil to populate test data - auto test_util = std::make_unique(); - test_util->StartMiniCluster(2); - - test_util->CreateTable("t", "d"); - test_util->TablePut("t", "test2", "d", "2", "value2"); - test_util->TablePut("t", "test2", "d", "extra", "value for extra"); - - // Create TableName and Row to be fetched from HBase - auto tn = folly::to("t"); - auto row = "test2"; - - // Get to be performed on above HBase Table - hbase::Get get(row); - - // Create a client - Client client(*(test_util->conf())); - - // Get connection to HBase Table - auto table = client.Table(tn); - ASSERT_TRUE(table) << "Unable to get connection to Table."; - - /* init region location and rpc channel */ - auto region_location = table->GetRegionLocation(row); - - auto io_executor_ = std::make_shared(1); - auto codec = std::make_shared(); - auto rpc_client = std::make_shared(io_executor_, codec); - - /* init rpc controller */ - auto controller = std::make_shared(); - - /* init rpc controller factory */ - auto controller_factory = std::make_shared(); - EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller)); - - /* init connection configuration */ - auto connection_conf = std::make_shared(); - EXPECT_CALL((*connection_conf), GetPauseNs()) - .Times(1) - .WillRepeatedly(Return(nanoseconds(100000000))); - EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31)); - EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9)); - EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs()) - .Times(1) - .WillRepeatedly(Return(nanoseconds(60000000000))); - EXPECT_CALL((*connection_conf), GetOperationTimeoutNs()) - .Times(1) - .WillRepeatedly(Return(nanoseconds(1200000000000))); - - /* init region locator */ - auto region_locator = std::make_shared(region_location); - - /* init hbase client connection */ - auto conn = std::make_shared(); - EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf)); - EXPECT_CALL((*conn), get_rpc_controller_factory()) - .Times(AtLeast(1)) - .WillRepeatedly(Return(controller_factory)); - EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator)); - EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client)); - - /* init retry caller factory */ - auto tableImpl = std::make_shared>(conn); - AsyncRpcRetryingCallerFactory caller_factory(conn); - - /* init request caller builder */ - auto builder = caller_factory.Single(); - - /* call with retry to get result */ - try { - auto async_caller = - builder->table(std::make_shared(tn)) - ->row(row) - ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) - ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) - ->action( - [=, &get]( - std::shared_ptr controller, - std::shared_ptr loc, - std::shared_ptr rpc_client) -> folly::Future { - return tableImpl->GetCall(rpc_client, controller, loc, get); - }) - ->Build(); - - hbase::Result result = async_caller->Call().get(); - - // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; - EXPECT_EQ("test2", result.Row()); - EXPECT_EQ("value2", *(result.Value("d", "2"))); - EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - throw e; - } - - table->Close(); - client.Close(); -} diff --git hbase-native-client/core/client-test.cc hbase-native-client/core/client-test.cc index 72680c6..1b14a94 100644 --- hbase-native-client/core/client-test.cc +++ hbase-native-client/core/client-test.cc @@ -59,6 +59,7 @@ class ClientTest : public ::testing::Test { static std::unique_ptr test_util; static void SetUpTestCase() { + google::InstallFailureSignalHandler(); test_util = std::make_unique(); test_util->StartMiniCluster(2); } diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index 3f889d4..0053e19 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include namespace hbase { @@ -34,53 +35,19 @@ Client::Client() { "hbase-site.xml is absent in the search path or problems in XML parsing"; throw std::runtime_error("Configuration object not present."); } - init(conf.value()); + Init(conf.value()); } -Client::Client(const hbase::Configuration &conf) { init(conf); } +Client::Client(const Configuration &conf) { Init(conf); } -void Client::init(const hbase::Configuration &conf) { - conf_ = std::make_shared(conf); - - conn_conf_ = std::make_shared(*conf_); - // start thread pools - auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN)); - auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); - cpu_executor_ = std::make_shared(cpu_threads); - io_executor_ = std::make_shared(io_threads); - - std::shared_ptr codec = nullptr; - if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == - std::string(KeyValueCodec::kJavaClassName)) { - codec = std::make_shared(); - } else { - LOG(WARNING) << "Not using RPC Cell Codec"; - } - rpc_client_ = - std::make_shared(io_executor_, codec, conn_conf_->connect_timeout()); - location_cache_ = - std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); -} - -// We can't have the threads continue running after everything is done -// that leads to an error. -Client::~Client() { - cpu_executor_->stop(); - io_executor_->stop(); - if (rpc_client_.get()) rpc_client_->Close(); -} - -std::unique_ptr Client::Table(const TableName &table_name) { - return std::make_unique(table_name, location_cache_, rpc_client_, conf_); +void Client::Init(const Configuration &conf) { + auto conf_ = std::make_shared(conf); + async_connection_ = AsyncConnectionImpl::Create(conf_); } -void Client::Close() { - if (is_closed_) return; - - cpu_executor_->stop(); - io_executor_->stop(); - if (rpc_client_.get()) rpc_client_->Close(); - is_closed_ = true; +std::unique_ptr Client::Table(const TableName &table_name) { + return std::make_unique(table_name, async_connection_); } +void Client::Close() { async_connection_->Close(); } } // namespace hbase diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index 040bea0..46fba7a 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -19,22 +19,14 @@ #pragma once -#include -#include -#include -#include - #include #include #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/configuration.h" -#include "core/connection-configuration.h" -#include "core/hbase-configuration-loader.h" -#include "core/keyvalue-codec.h" -#include "core/location-cache.h" + #include "core/table.h" -#include "if/Cell.pb.h" #include "serde/table-name.h" using hbase::pb::TableName; @@ -55,13 +47,14 @@ class Client { * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ Client(); - explicit Client(const hbase::Configuration& conf); - ~Client(); + explicit Client(const Configuration& conf); + ~Client() = default; + /** * @brief Retrieve a Table implementation for accessing a table. * @param - table_name */ - std::unique_ptr Table(const TableName& table_name); + std::unique_ptr
Table(const TableName& table_name); /** * @brief Close the Client connection. @@ -69,25 +62,12 @@ class Client { void Close(); private: - /** Constants */ - /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ - static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; - /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ - static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; - /** The RPC codec to encode cells. For now it is KeyValueCodec */ - static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; - /** Data */ - std::shared_ptr cpu_executor_; - std::shared_ptr io_executor_; - std::shared_ptr location_cache_; - std::shared_ptr rpc_client_; - std::shared_ptr conf_; - std::shared_ptr conn_conf_; - bool is_closed_ = false; + std::shared_ptr async_connection_; + private: /** Methods */ - void init(const hbase::Configuration& conf); + void Init(const Configuration& conf); }; } // namespace hbase diff --git hbase-native-client/core/connection-configuration.h hbase-native-client/core/connection-configuration.h index e1e9f87..71fe1cb 100644 --- hbase-native-client/core/connection-configuration.h +++ hbase-native-client/core/connection-configuration.h @@ -74,7 +74,7 @@ class ConnectionConfiguration { // timeout for each write rpc request nanoseconds write_rpc_timeout() const { return write_rpc_timeout_; } - nanoseconds pause_nanos() const { return pause_; } + nanoseconds pause() const { return pause_; } uint32_t max_retries() const { return max_retries_; } diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index 505f48c..07c3d61 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -158,7 +158,10 @@ Future> LocationCache::LocateFromMeta(const Tabl } Future> LocationCache::LocateRegion(const hbase::pb::TableName &tn, - const std::string &row) { + const std::string &row, + const RegionLocateType locate_type, + const int64_t locate_ns) { + // TODO: implement region locate type and timeout auto cached_loc = this->GetCachedLocation(tn, row); if (cached_loc != nullptr) { return cached_loc; @@ -280,3 +283,8 @@ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const st unique_lock lock(locations_lock_); table_locs->erase(row); } + +void LocationCache::UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) { + // TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later. + ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key()); +} diff --git hbase-native-client/core/location-cache.h hbase-native-client/core/location-cache.h index c1c5f62..5e79213 100644 --- hbase-native-client/core/location-cache.h +++ hbase-native-client/core/location-cache.h @@ -32,6 +32,7 @@ #include #include "connection/connection-pool.h" +#include "core/async-region-locator.h" #include "core/configuration.h" #include "core/meta-utils.h" #include "core/region-location.h" @@ -75,7 +76,7 @@ typedef std::unordered_map> LocateRegion(const hbase::pb::TableName &tn, - const std::string &row); + virtual folly::Future> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override; /** * Remove the cached location of meta. @@ -173,6 +176,12 @@ class LocationCache { */ void ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row); + /** + * Update cached region location, possibly using the information from exception. + */ + virtual void UpdateCachedLocation(const RegionLocation &loc, + const std::exception &error) override; + const std::string &zk_quorum() { return zk_quorum_; } private: diff --git hbase-native-client/core/meta-utils.cc hbase-native-client/core/meta-utils.cc index 24a0360..119520f 100644 --- hbase-native-client/core/meta-utils.cc +++ hbase-native-client/core/meta-utils.cc @@ -89,7 +89,7 @@ std::unique_ptr MetaUtil::MetaRequest(const TableName tn, const std::st } std::shared_ptr MetaUtil::CreateLocation(const Response &resp) { - std::vector> results = ResponseConverter::FromScanResponse(resp); + std::vector> results = ResponseConverter::FromScanResponse(resp); if (results.size() != 1) { throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + std::to_string(results.size())); diff --git hbase-native-client/core/raw-async-table.cc hbase-native-client/core/raw-async-table.cc new file mode 100644 index 0000000..082be35 --- /dev/null +++ hbase-native-client/core/raw-async-table.cc @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/raw-async-table.h" +#include "core/request-converter.h" +#include "core/response-converter.h" + +namespace hbase { + +template +std::shared_ptr> RawAsyncTable::CreateCaller( + std::string row, nanoseconds rpc_timeout) { + return connection_->caller_factory() + ->Single() + ->table(table_name_) + ->row(row) + ->rpc_timeout(rpc_timeout) + ->operation_timeout(connection_conf_->operation_timeout()) + ->pause(connection_conf_->pause()) + ->max_retries(connection_conf_->max_retries()) + ->start_log_errors_count(connection_conf_->start_log_errors_count()); +} + +template +folly::Future RawAsyncTable::Call( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + const ReqConverter, REQ, std::string>& req_converter, + const RespConverter& resp_converter) { + std::unique_ptr preq = req_converter(req, loc->region_name()); + + // No need to make take a callable argument, it is always the same + return rpc_client + ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), + User::defaultUser(), "ClientService") + .then([&](const std::unique_ptr& presp) { + return ResponseConverter::FromGetResponse(*presp); + //return resp_converter(*presp); // TODO this is causing SEGFAULT, figure out why + }); +} + +std::pair>>, +Future>> RawAsyncTable::Get(const hbase::Get& get) { + auto caller = + CreateCaller>(get.Row(), connection_conf_->read_rpc_timeout()) + ->action([=, &get](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) + -> folly::Future> { + return Call>( + rpc_client, controller, loc, get, + &hbase::RequestConverter::ToGetRequest, + &hbase::ResponseConverter::FromGetResponse); + }) + ->Build(); + + return std::pair>>, + Future>>(caller, caller->Call()); +} + +} /* namespace hbase */ diff --git hbase-native-client/core/raw-async-table.h hbase-native-client/core/raw-async-table.h new file mode 100644 index 0000000..54585ea --- /dev/null +++ hbase-native-client/core/raw-async-table.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include + +#include + +#include "core/async-connection.h" +#include "core/async-rpc-retrying-caller-factory.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/connection-configuration.h" +#include "core/get.h" +#include "core/result.h" + +using folly::Future; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +/** + * TODO + */ +class RawAsyncTable { + public: + RawAsyncTable(std::shared_ptr table_name, std::shared_ptr connection) + : connection_(connection), + connection_conf_(connection->connection_conf()), + table_name_(table_name), + rpc_client_(connection->rpc_client()) {} + virtual ~RawAsyncTable() = default; + + std::pair>>, + Future>> Get(const hbase::Get& get); + void Close() {} + + private: + /* Data */ + std::shared_ptr connection_; + std::shared_ptr connection_conf_; + std::shared_ptr table_name_; + std::shared_ptr rpc_client_; + + /* Methods */ + template + folly::Future Call( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + const ReqConverter, REQ, std::string>& req_converter, + const RespConverter& resp_converter); + + template + std::shared_ptr> CreateCaller(std::string row, + nanoseconds rpc_timeout); +}; + +} // namespace hbase diff --git hbase-native-client/core/response-converter.cc hbase-native-client/core/response-converter.cc index 7bb5e5d..b11856c 100644 --- hbase-native-client/core/response-converter.cc +++ hbase-native-client/core/response-converter.cc @@ -33,14 +33,17 @@ ResponseConverter::ResponseConverter() {} ResponseConverter::~ResponseConverter() {} -std::unique_ptr ResponseConverter::FromGetResponse(const Response& resp) { +// impl note: we are returning shared_ptr's instead of unique_ptr's because these +// go inside folly::Future's, making the move semantics extremely tricky. +std::shared_ptr ResponseConverter::FromGetResponse(const Response& resp) { + LOG(INFO) << "FromGetResponse"; auto get_resp = std::static_pointer_cast(resp.resp_msg()); - return ToResult(get_resp->result(), resp.cell_scanner()); } -std::unique_ptr ResponseConverter::ToResult( +std::shared_ptr ResponseConverter::ToResult( const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { + LOG(INFO) << "ToResult"; std::vector> vcells; for (auto cell : result.cell()) { std::shared_ptr pcell = @@ -56,16 +59,17 @@ std::unique_ptr ResponseConverter::ToResult( } // TODO: check associated cell count? } - return std::make_unique(vcells, result.exists(), result.stale(), result.partial()); + LOG(INFO) << "Returning Result"; + return std::make_shared(vcells, result.exists(), result.stale(), result.partial()); } -std::vector> ResponseConverter::FromScanResponse(const Response& resp) { +std::vector> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast(resp.resp_msg()); - VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); + LOG(INFO) << "FromScanResponse:" << scan_resp->ShortDebugString(); int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); - std::vector> results{static_cast(num_results)}; + std::vector> results{static_cast(num_results)}; for (int i = 0; i < num_results; i++) { if (resp.cell_scanner() != nullptr) { // Cells are out in cellblocks. Group them up again as Results. How many to read at a @@ -86,7 +90,7 @@ std::vector> ResponseConverter::FromScanResponse(const R throw std::runtime_error(msg); } // TODO: handle partial results per Result by checking partial_flag_per_result - results[i] = std::make_unique(vcells, false, scan_resp->stale(), false); + results[i] = std::make_shared(vcells, false, scan_resp->stale(), false); } else { results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); } diff --git hbase-native-client/core/response-converter.h hbase-native-client/core/response-converter.h index 759b1ce..743c14b 100644 --- hbase-native-client/core/response-converter.h +++ hbase-native-client/core/response-converter.h @@ -36,16 +36,16 @@ class ResponseConverter { public: ~ResponseConverter(); - static std::unique_ptr ToResult(const hbase::pb::Result& result, + static std::shared_ptr ToResult(const hbase::pb::Result& result, const std::unique_ptr& cell_scanner); /** * @brief Returns a Result object created by PB Message in passed Response object. * @param resp - Response object having the PB message. */ - static std::unique_ptr FromGetResponse(const Response& resp); + static std::shared_ptr FromGetResponse(const Response& resp); - static std::vector> FromScanResponse(const Response& resp); + static std::vector> FromScanResponse(const Response& resp); private: // Constructor not required. We have all static methods to extract response from PB messages. diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index 2ce8fcd..0ea21cd 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -25,11 +25,13 @@ #include #include +#include "core/async-connection.h" #include "core/request-converter.h" #include "core/response-converter.h" #include "if/Client.pb.h" #include "security/user.h" #include "serde/server-name.h" +#include "utils/time-util.h" using folly::Future; using hbase::pb::TableName; @@ -38,41 +40,28 @@ using std::chrono::milliseconds; namespace hbase { -Table::Table(const TableName &table_name, - const std::shared_ptr &location_cache, - const std::shared_ptr &rpc_client, - const std::shared_ptr &conf) +Table::Table(const TableName &table_name, std::shared_ptr async_connection) : table_name_(std::make_shared(table_name)), - location_cache_(location_cache), - rpc_client_(rpc_client), - conf_(conf) { - client_retries_ = (conf_) ? conf_->GetInt("hbase.client.retries", client_retries_) : 5; + async_connection_(async_connection), + conf_(async_connection->conf()) { + async_table_ = std::make_unique(table_name_, async_connection); } Table::~Table() {} -std::unique_ptr Table::Get(const hbase::Get &get) { - auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000)); - auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name()); - auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil - - Future> f = - rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), - std::move(req), user, "ClientService"); - auto resp = f.get(); - - return hbase::ResponseConverter::FromGetResponse(*resp); +std::shared_ptr Table::Get(const hbase::Get &get) { + auto context = async_table_->Get(get); + return context.second.get(operation_timeout()); } -void Table::Close() { - if (is_closed_) return; - - if (rpc_client_.get()) rpc_client_->Close(); - is_closed_ = true; +milliseconds Table::operation_timeout() const { + return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } +void Table::Close() { async_table_->Close(); } + std::shared_ptr Table::GetRegionLocation(const std::string &row) { - return location_cache_->LocateRegion(*table_name_, row).get(); + return async_connection_->region_locator()->LocateRegion(*table_name_, row).get(); } } /* namespace hbase */ diff --git hbase-native-client/core/table.h hbase-native-client/core/table.h index f82382e..93ab91b 100644 --- hbase-native-client/core/table.h +++ hbase-native-client/core/table.h @@ -19,15 +19,18 @@ #pragma once +#include #include #include #include #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/client.h" #include "core/configuration.h" #include "core/get.h" #include "core/location-cache.h" +#include "core/raw-async-table.h" #include "core/result.h" #include "serde/table-name.h" @@ -41,16 +44,17 @@ class Table { /** * Constructors */ - Table(const TableName &table_name, const std::shared_ptr &location_cache, - const std::shared_ptr &rpc_client, - const std::shared_ptr &conf); + Table(const TableName &table_name, std::shared_ptr async_connection); ~Table(); /** * @brief - Returns a Result object for the constructed Get. * @param - get Get object to perform HBase Get operation. */ - std::unique_ptr Get(const hbase::Get &get); + std::shared_ptr Get(const hbase::Get &get); + + // TODO: next jira + // std::vector> Get(const std::vector &gets); /** * @brief - Close the client connection. @@ -64,11 +68,11 @@ class Table { private: std::shared_ptr table_name_; - std::shared_ptr location_cache_; - std::shared_ptr rpc_client_; + std::shared_ptr async_connection_; std::shared_ptr conf_; - bool is_closed_ = false; - // default 5 retries. over-ridden in constructor. - int client_retries_ = 5; + std::unique_ptr async_table_; + + private: + milliseconds operation_timeout() const; }; } /* namespace hbase */ diff --git hbase-native-client/utils/time-util.h hbase-native-client/utils/time-util.h index bbc3b35..68d03ac 100644 --- hbase-native-client/utils/time-util.h +++ hbase-native-client/utils/time-util.h @@ -31,6 +31,10 @@ class TimeUtil { return std::chrono::duration_cast(nanoseconds(nanos)).count(); } + static milliseconds ToMillis(const nanoseconds& nanos) { + return std::chrono::duration_cast(nanoseconds(nanos)); + } + static std::string ToMillisStr(const nanoseconds& nanos) { return std::to_string(std::chrono::duration_cast(nanos).count()); }