diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index 3df48d6..2d77f2d 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -19,6 +19,8 @@ cxx_library( name="core", exported_headers=[ + "async-connection.h", + "async-region-locator.h", "client.h", "cell.h", "hbase-macros.h", @@ -38,12 +40,14 @@ cxx_library( "request-converter.h", "response-converter.h", "table.h", + "raw-async-table.h", "async-rpc-retrying-caller-factory.h", "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", "zk-util.h", ], srcs=[ + "async-connection.cc", "cell.cc", "client.cc", "keyvalue-codec.cc", @@ -52,6 +56,7 @@ cxx_library( "get.cc", "time-range.cc", "scan.cc", + "raw-async-table.cc", "result.cc", "request-converter.cc", "response-converter.cc", @@ -70,9 +75,7 @@ cxx_library( "//third-party:zookeeper_mt", ], compiler_flags=['-Weffc++', '-ggdb'], - visibility=[ - 'PUBLIC', - ],) + visibility=['PUBLIC',],) cxx_library( name="conf", exported_headers=[ diff --git hbase-native-client/core/async-connection.cc hbase-native-client/core/async-connection.cc new file mode 100644 index 0000000..b945e38 --- /dev/null +++ hbase-native-client/core/async-connection.cc @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-connection.h" +#include "core/async-rpc-retrying-caller-factory.h" + +namespace hbase { + +void AsyncConnectionImpl::Init() { + connection_conf_ = std::make_shared(*conf_); + // start thread pools + auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN)); + auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); + cpu_executor_ = std::make_shared(cpu_threads); + io_executor_ = std::make_shared(io_threads); + + std::shared_ptr codec = nullptr; + if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == + std::string(KeyValueCodec::kJavaClassName)) { + codec = std::make_shared(); + } else { + LOG(WARNING) << "Not using RPC Cell Codec"; + } + rpc_client_ = + std::make_shared(io_executor_, codec, connection_conf_->connect_timeout()); + location_cache_ = + std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); + caller_factory_ = std::make_shared(shared_from_this()); +} + +// We can't have the threads continue running after everything is done +// that leads to an error. +AsyncConnectionImpl::~AsyncConnectionImpl() { + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); +} + +void AsyncConnectionImpl::Close() { + if (is_closed_) return; + + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; +} + +} // namespace hbase diff --git hbase-native-client/core/async-connection.h hbase-native-client/core/async-connection.h new file mode 100644 index 0000000..6a61124 --- /dev/null +++ hbase-native-client/core/async-connection.h @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-region-locator.h" +#include "core/configuration.h" +#include "core/connection-configuration.h" +#include "core/connection-configuration.h" +#include "core/hbase-configuration-loader.h" +#include "core/hbase-rpc-controller.h" +#include "core/keyvalue-codec.h" +#include "core/location-cache.h" +#include "if/Cell.pb.h" +#include "serde/table-name.h" + +namespace hbase { + +class AsyncRpcRetryingCallerFactory; + +class AsyncConnection { + public: + AsyncConnection(){}; + virtual ~AsyncConnection(){}; + virtual std::shared_ptr conf() = 0; + virtual std::shared_ptr connection_conf() = 0; + virtual std::shared_ptr caller_factory() = 0; + virtual std::shared_ptr rpc_client() = 0; + virtual std::shared_ptr region_locator() = 0; + virtual std::shared_ptr CreateRpcController() = 0; + virtual void Close() = 0; +}; + +class AsyncConnectionImpl : public AsyncConnection, + public std::enable_shared_from_this { + public: + virtual ~AsyncConnectionImpl(); + + // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/ + template + static std::shared_ptr Create(T&&... all) { + auto conn = + std::shared_ptr(new AsyncConnectionImpl(std::forward(all)...)); + conn->Init(); + return conn; + } + + std::shared_ptr conf() override { return conf_; } + std::shared_ptr connection_conf() override { return connection_conf_; } + std::shared_ptr caller_factory() override { + return caller_factory_; + } + std::shared_ptr rpc_client() override { return rpc_client_; } + std::shared_ptr location_cache() { return location_cache_; } + std::shared_ptr region_locator() override { return location_cache_; } + std::shared_ptr CreateRpcController() override { + return std::make_shared(); + } + + virtual void Close() override; + + protected: + AsyncConnectionImpl() {} + + private: + /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ + static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; + /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ + static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; + /** The RPC codec to encode cells. For now it is KeyValueCodec */ + static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; + + std::shared_ptr conf_; + std::shared_ptr connection_conf_; + std::shared_ptr caller_factory_; + std::shared_ptr cpu_executor_; + std::shared_ptr io_executor_; + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_; + bool is_closed_ = false; + + private: + AsyncConnectionImpl(std::shared_ptr conf) : conf_(conf) {} + void Init(); +}; +} // namespace hbase diff --git hbase-native-client/core/async-region-locator.h hbase-native-client/core/async-region-locator.h new file mode 100644 index 0000000..b0019e0 --- /dev/null +++ hbase-native-client/core/async-region-locator.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +#include "core/region-location.h" +#include "if/Client.pb.h" +#include "serde/region-info.h" +#include "serde/server-name.h" +#include "serde/table-name.h" + +namespace hbase { + +class AsyncRegionLocator { + public: + AsyncRegionLocator() {} + virtual ~AsyncRegionLocator() = default; + + /** + * The only method clients should use for meta lookups. If corresponding + * location is cached, it's returned from the cache, otherwise lookup + * in meta table is done, location is cached and then returned. + * It's expected that tiny fraction of invocations incurs meta scan. + * This method is to look up non-meta regions; use LocateMeta() to get the + * location of hbase:meta region. + * + * @param tn Table name of the table to look up. This object must live until + * after the future is returned + * + * @param row of the table to look up. This object must live until after the + * future is returned + */ + virtual folly::Future> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) = 0; + /** + * Update cached region location, possibly using the information from exception. + */ + virtual void UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) = 0; +}; + +} // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h index 3342e29..5bcad6c 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -35,20 +35,25 @@ using std::chrono::nanoseconds; namespace hbase { -template +class AsyncConnection; + +template class SingleRequestCallerBuilder - : public std::enable_shared_from_this> { + : public std::enable_shared_from_this> { public: - explicit SingleRequestCallerBuilder(std::shared_ptr conn) + explicit SingleRequestCallerBuilder(std::shared_ptr conn) : conn_(conn), table_name_(nullptr), - rpc_timeout_nanos_(0), - operation_timeout_nanos_(0), + rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), + pause_(conn->connection_conf()->pause()), + operation_timeout_nanos_(conn->connection_conf()->operation_timeout()), + max_retries_(conn->connection_conf()->max_retries()), + start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), locate_type_(RegionLocateType::kCurrent) {} virtual ~SingleRequestCallerBuilder() = default; - typedef SingleRequestCallerBuilder GenenericThisType; + typedef SingleRequestCallerBuilder GenenericThisType; typedef std::shared_ptr SharedThisPtr; SharedThisPtr table(std::shared_ptr table_name) { @@ -66,6 +71,21 @@ class SingleRequestCallerBuilder return shared_this(); } + SharedThisPtr pause(nanoseconds pause) { + pause_ = pause; + return shared_this(); + } + + SharedThisPtr max_retries(uint32_t max_retries) { + max_retries_ = max_retries; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + SharedThisPtr row(const std::string& row) { row_ = row; return shared_this(); @@ -76,18 +96,17 @@ class SingleRequestCallerBuilder return shared_this(); } - SharedThisPtr action(Callable callable) { + SharedThisPtr action(Callable callable) { callable_ = callable; return shared_this(); } folly::Future Call() { return Build()->Call(); } - std::shared_ptr> Build() { - return std::make_shared>( - conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(), - conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_, - conn_->get_conn_conf()->GetStartLogErrorsCount()); + std::shared_ptr> Build() { + return std::make_shared>( + conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, + operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); } private: @@ -96,28 +115,30 @@ class SingleRequestCallerBuilder } private: - std::shared_ptr conn_; + std::shared_ptr conn_; std::shared_ptr table_name_; nanoseconds rpc_timeout_nanos_; nanoseconds operation_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; std::string row_; RegionLocateType locate_type_; - Callable callable_; + Callable callable_; }; // end of SingleRequestCallerBuilder -template class AsyncRpcRetryingCallerFactory { private: - std::shared_ptr conn_; + std::shared_ptr conn_; public: - explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} virtual ~AsyncRpcRetryingCallerFactory() = default; - template - std::shared_ptr> Single() { - return std::make_shared>(conn_); + template + std::shared_ptr> Single() { + return std::make_shared>(conn_); } }; diff --git hbase-native-client/core/async-rpc-retrying-caller.h hbase-native-client/core/async-rpc-retrying-caller.h index f7a1523..6503301 100644 --- hbase-native-client/core/async-rpc-retrying-caller.h +++ hbase-native-client/core/async-rpc-retrying-caller.h @@ -32,6 +32,7 @@ #include #include #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/hbase-rpc-controller.h" #include "core/region-location.h" #include "exceptions/exception.h" @@ -60,23 +61,23 @@ using RespConverter = std::function; template using RpcCallback = std::function; -template +template using RpcCall = std::function>( - std::shared_ptr, std::shared_ptr, + std::shared_ptr, std::shared_ptr, std::shared_ptr, std::unique_ptr)>; -template -using Callable = std::function(std::shared_ptr, - std::shared_ptr, - std::shared_ptr)>; +template +using Callable = + std::function(std::shared_ptr, + std::shared_ptr, std::shared_ptr)>; -template +template class AsyncSingleRequestRpcRetryingCaller { public: - AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, + AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, std::shared_ptr table_name, const std::string& row, RegionLocateType locate_type, - Callable callable, nanoseconds pause_ns, + Callable callable, nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) @@ -85,14 +86,14 @@ class AsyncSingleRequestRpcRetryingCaller { row_(row), locate_type_(locate_type), callable_(callable), - pause_ns_(pause_ns), + pause_(pause), max_retries_(max_retries), operation_timeout_nanos_(operation_timeout_nanos), rpc_timeout_nanos_(rpc_timeout_nanos), start_log_errors_count_(start_log_errors_count), promise_(std::make_shared>()), tries_(1) { - controller_ = conn_->get_rpc_controller_factory()->NewController(); + controller_ = conn_->CreateRpcController(); start_ns_ = TimeUtil::GetNowNanos(); max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); exceptions_ = std::make_shared>(); @@ -120,9 +121,9 @@ class AsyncSingleRequestRpcRetryingCaller { locate_timeout_ns = -1L; } - conn_->get_locator() - ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) - .then([this](RegionLocation& loc) { Call(loc); }) + conn_->region_locator() + ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](std::shared_ptr loc) { Call(*loc); }) .onError([this](const std::exception& e) { OnError(e, [this]() -> std::string { @@ -155,10 +156,9 @@ class AsyncSingleRequestRpcRetryingCaller { CompleteExceptionally(); return; } - delay_ns = - std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1)); + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1); + delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); } update_cached_location(error); tries_++; @@ -179,9 +179,10 @@ class AsyncSingleRequestRpcRetryingCaller { call_timeout_ns = rpc_timeout_nanos_.count(); } - std::shared_ptr rpc_client; + std::shared_ptr rpc_client; try { - rpc_client = conn_->GetRpcClient(); + // TODO: There is no connection attempt happening here, no need to try-catch. + rpc_client = conn_->rpc_client(); } catch (const IOException& e) { OnError(e, [&, this]() -> std::string { @@ -196,7 +197,7 @@ class AsyncSingleRequestRpcRetryingCaller { " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; }, [&, this](const std::exception& error) { - conn_->get_locator()->UpdateCachedLocation(loc, error); + conn_->region_locator()->UpdateCachedLocation(loc, error); }); return; } @@ -219,7 +220,7 @@ class AsyncSingleRequestRpcRetryingCaller { " ms"; }, [&, this](const std::exception& error) { - conn_->get_locator()->UpdateCachedLocation(loc, error); + conn_->region_locator()->UpdateCachedLocation(loc, error); }); return; }); @@ -244,12 +245,12 @@ class AsyncSingleRequestRpcRetryingCaller { private: folly::HHWheelTimer::UniquePtr retry_timer_; - std::shared_ptr conn_; + std::shared_ptr conn_; std::shared_ptr table_name_; std::string row_; RegionLocateType locate_type_; - Callable callable_; - nanoseconds pause_ns_; + Callable callable_; + nanoseconds pause_; uint32_t max_retries_; nanoseconds operation_timeout_nanos_; nanoseconds rpc_timeout_nanos_; diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index 81d726f..43d237e 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -30,9 +30,11 @@ #include "connection/request.h" #include "connection/response.h" #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/async-rpc-retrying-caller-factory.h" #include "core/async-rpc-retrying-caller.h" #include "core/client.h" +#include "core/connection-configuration.h" #include "core/hbase-rpc-controller.h" #include "core/keyvalue-codec.h" #include "core/region-location.h" @@ -43,6 +45,7 @@ #include "if/Client.pb.h" #include "if/HBase.pb.h" #include "test-util/test-util.h" +#include "utils/time-util.h" using namespace google::protobuf; using namespace hbase; @@ -53,61 +56,74 @@ using ::testing::Return; using ::testing::_; using std::chrono::nanoseconds; -class MockRpcControllerFactory { +class MockAsyncRegionLocator : public AsyncRegionLocator { public: - MOCK_METHOD0(NewController, std::shared_ptr()); -}; - -class MockAsyncConnectionConfiguration { - public: - MOCK_METHOD0(GetPauseNs, nanoseconds()); - MOCK_METHOD0(GetMaxRetries, int32_t()); - MOCK_METHOD0(GetStartLogErrorsCount, int32_t()); - MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds()); - MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds()); -}; - -class AsyncRegionLocator { - public: - explicit AsyncRegionLocator(std::shared_ptr region_location) + explicit MockAsyncRegionLocator(std::shared_ptr region_location) : region_location_(region_location) {} - ~AsyncRegionLocator() = default; - - folly::Future GetRegionLocation(std::shared_ptr, - const std::string&, RegionLocateType, int64_t) { - folly::Promise promise; - promise.setValue(*region_location_); + ~MockAsyncRegionLocator() = default; + + folly::Future> LocateRegion(const hbase::pb::TableName&, + const std::string&, + const RegionLocateType, + const int64_t) override { + folly::Promise> promise; + promise.setValue(region_location_); return promise.getFuture(); } - void UpdateCachedLocation(const RegionLocation&, const std::exception&) {} + void UpdateCachedLocation(const RegionLocation&, const std::exception&) override {} private: std::shared_ptr region_location_; }; -class MockAsyncConnection { +class MockAsyncConnection : public AsyncConnectionImpl { public: - MOCK_METHOD0(get_conn_conf, std::shared_ptr()); - MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr()); - MOCK_METHOD0(get_locator, std::shared_ptr()); - MOCK_METHOD0(GetRpcClient, std::shared_ptr()); + MockAsyncConnection(std::shared_ptr conn_conf, + std::shared_ptr rpc_client, + std::shared_ptr region_locator) + : AsyncConnectionImpl(), + conn_conf_(conn_conf), + rpc_client_(rpc_client), + region_locator_(region_locator) { + caller_factory_ = std::make_shared(shared_from_this()); + } + ~MockAsyncConnection() {} + + std::shared_ptr conf() override { return nullptr; } + std::shared_ptr connection_conf() override { return conn_conf_; } + std::shared_ptr caller_factory() override { + return caller_factory_; + } + std::shared_ptr rpc_client() override { return rpc_client_; } + std::shared_ptr region_locator() override { return region_locator_; } + + void Close() override {} + std::shared_ptr CreateRpcController() override { + return std::make_shared(); + } + + private: + std::shared_ptr conn_conf_; + std::shared_ptr caller_factory_; + std::shared_ptr rpc_client_; + std::shared_ptr region_locator_; }; template class MockRawAsyncTableImpl { public: explicit MockRawAsyncTableImpl(std::shared_ptr conn) - : conn_(conn), promise_(std::make_shared>()) {} + : conn_(conn), promise_(std::make_shared>>()) {} virtual ~MockRawAsyncTableImpl() = default; /* implement this in real RawAsyncTableImpl. */ /* in real RawAsyncTableImpl, this should be private. */ - folly::Future GetCall(std::shared_ptr rpc_client, - std::shared_ptr controller, - std::shared_ptr loc, const hbase::Get& get) { - hbase::RpcCall rpc_call = []( + folly::Future> GetCall( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const hbase::Get& get) { + hbase::RpcCall rpc_call = []( std::shared_ptr rpc_client, std::shared_ptr loc, std::shared_ptr controller, std::unique_ptr preq) -> folly::Future> { @@ -115,7 +131,7 @@ class MockRawAsyncTableImpl { std::move(preq), User::defaultUser(), "ClientService"); }; - return Call( + return Call>( rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, &hbase::ResponseConverter::FromGetResponse); } @@ -126,12 +142,12 @@ class MockRawAsyncTableImpl { std::shared_ptr rpc_client, std::shared_ptr controller, std::shared_ptr loc, const REQ& req, const ReqConverter, REQ, std::string>& req_converter, - const hbase::RpcCall& rpc_call, - const RespConverter, PRESP>& resp_converter) { + const hbase::RpcCall& rpc_call, + const RespConverter& resp_converter) { rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) .then([&, this](std::unique_ptr presp) { - std::unique_ptr result = hbase::ResponseConverter::FromGetResponse(*presp); - promise_->setValue(std::move(*result)); + std::shared_ptr result = hbase::ResponseConverter::FromGetResponse(*presp); + promise_->setValue(result); }) .onError([this](const std::exception& e) { promise_->setException(e); }); return promise_->getFuture(); @@ -139,7 +155,7 @@ class MockRawAsyncTableImpl { private: std::shared_ptr conn_; - std::shared_ptr> promise_; + std::shared_ptr>> promise_; }; TEST(AsyncRpcRetryTest, TestGetBasic) { @@ -172,69 +188,49 @@ TEST(AsyncRpcRetryTest, TestGetBasic) { auto codec = std::make_shared(); auto rpc_client = std::make_shared(io_executor_, codec); - /* init rpc controller */ - auto controller = std::make_shared(); - - /* init rpc controller factory */ - auto controller_factory = std::make_shared(); - EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller)); - /* init connection configuration */ - auto connection_conf = std::make_shared(); - EXPECT_CALL((*connection_conf), GetPauseNs()) - .Times(1) - .WillRepeatedly(Return(nanoseconds(100000000))); - EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31)); - EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9)); - EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs()) - .Times(1) - .WillRepeatedly(Return(nanoseconds(60000000000))); - EXPECT_CALL((*connection_conf), GetOperationTimeoutNs()) - .Times(1) - .WillRepeatedly(Return(nanoseconds(1200000000000))); + auto connection_conf = std::make_shared( + TimeUtil::SecondsToNanos(20), // connect_timeout + TimeUtil::SecondsToNanos(1200), // operation_timeout + TimeUtil::SecondsToNanos(60), // rpc_timeout + TimeUtil::MillisToNanos(100), // pause + 31, // max retries + 9); // start log errors count /* init region locator */ - auto region_locator = std::make_shared(region_location); + auto region_locator = std::make_shared(region_location); /* init hbase client connection */ - auto conn = std::make_shared(); - EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf)); - EXPECT_CALL((*conn), get_rpc_controller_factory()) - .Times(AtLeast(1)) - .WillRepeatedly(Return(controller_factory)); - EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator)); - EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client)); + auto conn = std::make_shared(connection_conf, rpc_client, region_locator); /* init retry caller factory */ auto tableImpl = std::make_shared>(conn); - AsyncRpcRetryingCallerFactory caller_factory(conn); /* init request caller builder */ - auto builder = caller_factory.Single(); + auto builder = conn->caller_factory()->Single>(); /* call with retry to get result */ try { auto async_caller = builder->table(std::make_shared(tn)) ->row(row) - ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) - ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) - ->action( - [=, &get]( - std::shared_ptr controller, - std::shared_ptr loc, - std::shared_ptr rpc_client) -> folly::Future { - return tableImpl->GetCall(rpc_client, controller, loc, get); - }) + ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) + ->operation_timeout(conn->connection_conf()->operation_timeout()) + ->action([=, &get](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) + -> folly::Future> { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) ->Build(); - hbase::Result result = async_caller->Call().get(); + auto result = async_caller->Call().get(); // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; - EXPECT_EQ("test2", result.Row()); - EXPECT_EQ("value2", *(result.Value("d", "2"))); - EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); } catch (std::exception& e) { LOG(ERROR) << e.what(); throw e; diff --git hbase-native-client/core/client-test.cc hbase-native-client/core/client-test.cc index 72680c6..184a6a7 100644 --- hbase-native-client/core/client-test.cc +++ hbase-native-client/core/client-test.cc @@ -47,6 +47,7 @@ class ClientTest : public ::testing::Test { const std::string xml_data) { // Remove temp file always boost::filesystem::remove((dir + file).c_str()); + boost::filesystem::create_directories(dir.c_str()); WriteDataToFile((dir + file), xml_data); } @@ -59,6 +60,7 @@ class ClientTest : public ::testing::Test { static std::unique_ptr test_util; static void SetUpTestCase() { + google::InstallFailureSignalHandler(); test_util = std::make_unique(); test_util->StartMiniCluster(2); } diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index 3f889d4..0053e19 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include namespace hbase { @@ -34,53 +35,19 @@ Client::Client() { "hbase-site.xml is absent in the search path or problems in XML parsing"; throw std::runtime_error("Configuration object not present."); } - init(conf.value()); + Init(conf.value()); } -Client::Client(const hbase::Configuration &conf) { init(conf); } +Client::Client(const Configuration &conf) { Init(conf); } -void Client::init(const hbase::Configuration &conf) { - conf_ = std::make_shared(conf); - - conn_conf_ = std::make_shared(*conf_); - // start thread pools - auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN)); - auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); - cpu_executor_ = std::make_shared(cpu_threads); - io_executor_ = std::make_shared(io_threads); - - std::shared_ptr codec = nullptr; - if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == - std::string(KeyValueCodec::kJavaClassName)) { - codec = std::make_shared(); - } else { - LOG(WARNING) << "Not using RPC Cell Codec"; - } - rpc_client_ = - std::make_shared(io_executor_, codec, conn_conf_->connect_timeout()); - location_cache_ = - std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); -} - -// We can't have the threads continue running after everything is done -// that leads to an error. -Client::~Client() { - cpu_executor_->stop(); - io_executor_->stop(); - if (rpc_client_.get()) rpc_client_->Close(); -} - -std::unique_ptr Client::Table(const TableName &table_name) { - return std::make_unique(table_name, location_cache_, rpc_client_, conf_); +void Client::Init(const Configuration &conf) { + auto conf_ = std::make_shared(conf); + async_connection_ = AsyncConnectionImpl::Create(conf_); } -void Client::Close() { - if (is_closed_) return; - - cpu_executor_->stop(); - io_executor_->stop(); - if (rpc_client_.get()) rpc_client_->Close(); - is_closed_ = true; +std::unique_ptr Client::Table(const TableName &table_name) { + return std::make_unique(table_name, async_connection_); } +void Client::Close() { async_connection_->Close(); } } // namespace hbase diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index 040bea0..0e11278 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -19,22 +19,14 @@ #pragma once -#include -#include -#include -#include - #include #include #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/configuration.h" -#include "core/connection-configuration.h" -#include "core/hbase-configuration-loader.h" -#include "core/keyvalue-codec.h" -#include "core/location-cache.h" + #include "core/table.h" -#include "if/Cell.pb.h" #include "serde/table-name.h" using hbase::pb::TableName; @@ -55,13 +47,14 @@ class Client { * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ Client(); - explicit Client(const hbase::Configuration& conf); - ~Client(); + explicit Client(const Configuration& conf); + ~Client() = default; + /** * @brief Retrieve a Table implementation for accessing a table. * @param - table_name */ - std::unique_ptr Table(const TableName& table_name); + std::unique_ptr<::hbase::Table> Table(const TableName& table_name); /** * @brief Close the Client connection. @@ -69,25 +62,12 @@ class Client { void Close(); private: - /** Constants */ - /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ - static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; - /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ - static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; - /** The RPC codec to encode cells. For now it is KeyValueCodec */ - static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; - /** Data */ - std::shared_ptr cpu_executor_; - std::shared_ptr io_executor_; - std::shared_ptr location_cache_; - std::shared_ptr rpc_client_; - std::shared_ptr conf_; - std::shared_ptr conn_conf_; - bool is_closed_ = false; + std::shared_ptr async_connection_; + private: /** Methods */ - void init(const hbase::Configuration& conf); + void Init(const Configuration& conf); }; } // namespace hbase diff --git hbase-native-client/core/connection-configuration.h hbase-native-client/core/connection-configuration.h index e1e9f87..c6d1d60 100644 --- hbase-native-client/core/connection-configuration.h +++ hbase-native-client/core/connection-configuration.h @@ -56,6 +56,20 @@ class ConnectionConfiguration { conf.GetLong(kClientScannerMaxResultsSize, kDefaultClientScannerMaxResultsSize); } + // Used by tests + ConnectionConfiguration(nanoseconds connect_timeout, nanoseconds operation_timeout, + nanoseconds rpc_timeout, nanoseconds pause, uint32_t max_retries, + uint32_t start_log_errors_count) + : connect_timeout_(connect_timeout), + operation_timeout_(operation_timeout), + meta_operation_timeout_(operation_timeout), + rpc_timeout_(rpc_timeout), + read_rpc_timeout_(rpc_timeout), + write_rpc_timeout_(rpc_timeout), + pause_(pause), + max_retries_(max_retries), + start_log_errors_count_(start_log_errors_count) {} + nanoseconds connect_timeout() const { return connect_timeout_; } nanoseconds meta_operation_timeout() const { return meta_operation_timeout_; } @@ -74,7 +88,7 @@ class ConnectionConfiguration { // timeout for each write rpc request nanoseconds write_rpc_timeout() const { return write_rpc_timeout_; } - nanoseconds pause_nanos() const { return pause_; } + nanoseconds pause() const { return pause_; } uint32_t max_retries() const { return max_retries_; } diff --git hbase-native-client/core/location-cache-test.cc hbase-native-client/core/location-cache-test.cc index 4159af4..8d1ac5f 100644 --- hbase-native-client/core/location-cache-test.cc +++ hbase-native-client/core/location-cache-test.cc @@ -33,6 +33,7 @@ using namespace std::chrono; class LocationCacheTest : public ::testing::Test { protected: static void SetUpTestCase() { + google::InstallFailureSignalHandler(); test_util_ = std::make_unique(); test_util_->StartMiniCluster(2); } diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index 505f48c..07c3d61 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -158,7 +158,10 @@ Future> LocationCache::LocateFromMeta(const Tabl } Future> LocationCache::LocateRegion(const hbase::pb::TableName &tn, - const std::string &row) { + const std::string &row, + const RegionLocateType locate_type, + const int64_t locate_ns) { + // TODO: implement region locate type and timeout auto cached_loc = this->GetCachedLocation(tn, row); if (cached_loc != nullptr) { return cached_loc; @@ -280,3 +283,8 @@ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const st unique_lock lock(locations_lock_); table_locs->erase(row); } + +void LocationCache::UpdateCachedLocation(const RegionLocation &loc, const std::exception &error) { + // TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later. + ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key()); +} diff --git hbase-native-client/core/location-cache.h hbase-native-client/core/location-cache.h index c1c5f62..5e79213 100644 --- hbase-native-client/core/location-cache.h +++ hbase-native-client/core/location-cache.h @@ -32,6 +32,7 @@ #include #include "connection/connection-pool.h" +#include "core/async-region-locator.h" #include "core/configuration.h" #include "core/meta-utils.h" #include "core/region-location.h" @@ -75,7 +76,7 @@ typedef std::unordered_map> LocateRegion(const hbase::pb::TableName &tn, - const std::string &row); + virtual folly::Future> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override; /** * Remove the cached location of meta. @@ -173,6 +176,12 @@ class LocationCache { */ void ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row); + /** + * Update cached region location, possibly using the information from exception. + */ + virtual void UpdateCachedLocation(const RegionLocation &loc, + const std::exception &error) override; + const std::string &zk_quorum() { return zk_quorum_; } private: diff --git hbase-native-client/core/meta-utils.cc hbase-native-client/core/meta-utils.cc index 24a0360..119520f 100644 --- hbase-native-client/core/meta-utils.cc +++ hbase-native-client/core/meta-utils.cc @@ -89,7 +89,7 @@ std::unique_ptr MetaUtil::MetaRequest(const TableName tn, const std::st } std::shared_ptr MetaUtil::CreateLocation(const Response &resp) { - std::vector> results = ResponseConverter::FromScanResponse(resp); + std::vector> results = ResponseConverter::FromScanResponse(resp); if (results.size() != 1) { throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + std::to_string(results.size())); diff --git hbase-native-client/core/raw-async-table.cc hbase-native-client/core/raw-async-table.cc new file mode 100644 index 0000000..641f3c8 --- /dev/null +++ hbase-native-client/core/raw-async-table.cc @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/raw-async-table.h" +#include "core/request-converter.h" +#include "core/response-converter.h" + +namespace hbase { + +template +std::shared_ptr> RawAsyncTable::CreateCallerBuilder( + std::string row, nanoseconds rpc_timeout) { + return connection_->caller_factory() + ->Single() + ->table(table_name_) + ->row(row) + ->rpc_timeout(rpc_timeout) + ->operation_timeout(connection_conf_->operation_timeout()) + ->pause(connection_conf_->pause()) + ->max_retries(connection_conf_->max_retries()) + ->start_log_errors_count(connection_conf_->start_log_errors_count()); +} + +template +folly::Future RawAsyncTable::Call( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + const ReqConverter, REQ, std::string>& req_converter, + const RespConverter& resp_converter) { + std::unique_ptr preq = req_converter(req, loc->region_name()); + + // No need to make take a callable argument, it is always the same + return rpc_client + ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), + User::defaultUser(), "ClientService") + .then([&](const std::unique_ptr& presp) { + return ResponseConverter::FromGetResponse(*presp); + // return resp_converter(*presp); // TODO this is causing SEGFAULT, figure out why + }); +} + +Future> RawAsyncTable::Get(const hbase::Get& get) { + auto caller = + CreateCallerBuilder>(get.Row(), connection_conf_->read_rpc_timeout()) + ->action([=, &get](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) + -> folly::Future> { + return Call>( + rpc_client, controller, loc, get, + &hbase::RequestConverter::ToGetRequest, + &hbase::ResponseConverter::FromGetResponse); + }) + ->Build(); + + // Return the Future we obtain from the call(). However, we do not want the Caller to go out of + // context and get deallocated since the caller injects a lot of closures which capture [this, &] + // which is use-after-free. We are just passing an identity closure capturing caller by value to + // ensure that the lifecycle of the Caller object is longer than the retry lambdas. + return caller->Call().then([caller](const auto r) { return r; }); +} + +} /* namespace hbase */ diff --git hbase-native-client/core/raw-async-table.h hbase-native-client/core/raw-async-table.h new file mode 100644 index 0000000..527c7be --- /dev/null +++ hbase-native-client/core/raw-async-table.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include + +#include + +#include "core/async-connection.h" +#include "core/async-rpc-retrying-caller-factory.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/connection-configuration.h" +#include "core/get.h" +#include "core/result.h" + +using folly::Future; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +/** + * A low level asynchronous table that should not be used by user applications.The implementation + * is required to be thread safe. + */ +class RawAsyncTable { + public: + RawAsyncTable(std::shared_ptr table_name, std::shared_ptr connection) + : connection_(connection), + connection_conf_(connection->connection_conf()), + table_name_(table_name), + rpc_client_(connection->rpc_client()) {} + virtual ~RawAsyncTable() = default; + + Future> Get(const hbase::Get& get); + void Close() {} + + private: + /* Data */ + std::shared_ptr connection_; + std::shared_ptr connection_conf_; + std::shared_ptr table_name_; + std::shared_ptr rpc_client_; + + /* Methods */ + template + folly::Future Call( + std::shared_ptr rpc_client, std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + const ReqConverter, REQ, std::string>& req_converter, + const RespConverter& resp_converter); + + template + std::shared_ptr> CreateCallerBuilder(std::string row, + nanoseconds rpc_timeout); +}; + +} // namespace hbase diff --git hbase-native-client/core/response-converter.cc hbase-native-client/core/response-converter.cc index 7bb5e5d..b11856c 100644 --- hbase-native-client/core/response-converter.cc +++ hbase-native-client/core/response-converter.cc @@ -33,14 +33,17 @@ ResponseConverter::ResponseConverter() {} ResponseConverter::~ResponseConverter() {} -std::unique_ptr ResponseConverter::FromGetResponse(const Response& resp) { +// impl note: we are returning shared_ptr's instead of unique_ptr's because these +// go inside folly::Future's, making the move semantics extremely tricky. +std::shared_ptr ResponseConverter::FromGetResponse(const Response& resp) { + LOG(INFO) << "FromGetResponse"; auto get_resp = std::static_pointer_cast(resp.resp_msg()); - return ToResult(get_resp->result(), resp.cell_scanner()); } -std::unique_ptr ResponseConverter::ToResult( +std::shared_ptr ResponseConverter::ToResult( const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { + LOG(INFO) << "ToResult"; std::vector> vcells; for (auto cell : result.cell()) { std::shared_ptr pcell = @@ -56,16 +59,17 @@ std::unique_ptr ResponseConverter::ToResult( } // TODO: check associated cell count? } - return std::make_unique(vcells, result.exists(), result.stale(), result.partial()); + LOG(INFO) << "Returning Result"; + return std::make_shared(vcells, result.exists(), result.stale(), result.partial()); } -std::vector> ResponseConverter::FromScanResponse(const Response& resp) { +std::vector> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast(resp.resp_msg()); - VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); + LOG(INFO) << "FromScanResponse:" << scan_resp->ShortDebugString(); int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); - std::vector> results{static_cast(num_results)}; + std::vector> results{static_cast(num_results)}; for (int i = 0; i < num_results; i++) { if (resp.cell_scanner() != nullptr) { // Cells are out in cellblocks. Group them up again as Results. How many to read at a @@ -86,7 +90,7 @@ std::vector> ResponseConverter::FromScanResponse(const R throw std::runtime_error(msg); } // TODO: handle partial results per Result by checking partial_flag_per_result - results[i] = std::make_unique(vcells, false, scan_resp->stale(), false); + results[i] = std::make_shared(vcells, false, scan_resp->stale(), false); } else { results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); } diff --git hbase-native-client/core/response-converter.h hbase-native-client/core/response-converter.h index 759b1ce..743c14b 100644 --- hbase-native-client/core/response-converter.h +++ hbase-native-client/core/response-converter.h @@ -36,16 +36,16 @@ class ResponseConverter { public: ~ResponseConverter(); - static std::unique_ptr ToResult(const hbase::pb::Result& result, + static std::shared_ptr ToResult(const hbase::pb::Result& result, const std::unique_ptr& cell_scanner); /** * @brief Returns a Result object created by PB Message in passed Response object. * @param resp - Response object having the PB message. */ - static std::unique_ptr FromGetResponse(const Response& resp); + static std::shared_ptr FromGetResponse(const Response& resp); - static std::vector> FromScanResponse(const Response& resp); + static std::vector> FromScanResponse(const Response& resp); private: // Constructor not required. We have all static methods to extract response from PB messages. diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index 2ce8fcd..3c54d78 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -25,11 +25,13 @@ #include #include +#include "core/async-connection.h" #include "core/request-converter.h" #include "core/response-converter.h" #include "if/Client.pb.h" #include "security/user.h" #include "serde/server-name.h" +#include "utils/time-util.h" using folly::Future; using hbase::pb::TableName; @@ -38,41 +40,28 @@ using std::chrono::milliseconds; namespace hbase { -Table::Table(const TableName &table_name, - const std::shared_ptr &location_cache, - const std::shared_ptr &rpc_client, - const std::shared_ptr &conf) +Table::Table(const TableName &table_name, std::shared_ptr async_connection) : table_name_(std::make_shared(table_name)), - location_cache_(location_cache), - rpc_client_(rpc_client), - conf_(conf) { - client_retries_ = (conf_) ? conf_->GetInt("hbase.client.retries", client_retries_) : 5; + async_connection_(async_connection), + conf_(async_connection->conf()) { + async_table_ = std::make_unique(table_name_, async_connection); } Table::~Table() {} -std::unique_ptr Table::Get(const hbase::Get &get) { - auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000)); - auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name()); - auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil - - Future> f = - rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), - std::move(req), user, "ClientService"); - auto resp = f.get(); - - return hbase::ResponseConverter::FromGetResponse(*resp); +std::shared_ptr Table::Get(const hbase::Get &get) { + auto context = async_table_->Get(get); + return context.get(operation_timeout()); } -void Table::Close() { - if (is_closed_) return; - - if (rpc_client_.get()) rpc_client_->Close(); - is_closed_ = true; +milliseconds Table::operation_timeout() const { + return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } +void Table::Close() { async_table_->Close(); } + std::shared_ptr Table::GetRegionLocation(const std::string &row) { - return location_cache_->LocateRegion(*table_name_, row).get(); + return async_connection_->region_locator()->LocateRegion(*table_name_, row).get(); } } /* namespace hbase */ diff --git hbase-native-client/core/table.h hbase-native-client/core/table.h index f82382e..93ab91b 100644 --- hbase-native-client/core/table.h +++ hbase-native-client/core/table.h @@ -19,15 +19,18 @@ #pragma once +#include #include #include #include #include "connection/rpc-client.h" +#include "core/async-connection.h" #include "core/client.h" #include "core/configuration.h" #include "core/get.h" #include "core/location-cache.h" +#include "core/raw-async-table.h" #include "core/result.h" #include "serde/table-name.h" @@ -41,16 +44,17 @@ class Table { /** * Constructors */ - Table(const TableName &table_name, const std::shared_ptr &location_cache, - const std::shared_ptr &rpc_client, - const std::shared_ptr &conf); + Table(const TableName &table_name, std::shared_ptr async_connection); ~Table(); /** * @brief - Returns a Result object for the constructed Get. * @param - get Get object to perform HBase Get operation. */ - std::unique_ptr Get(const hbase::Get &get); + std::shared_ptr Get(const hbase::Get &get); + + // TODO: next jira + // std::vector> Get(const std::vector &gets); /** * @brief - Close the client connection. @@ -64,11 +68,11 @@ class Table { private: std::shared_ptr table_name_; - std::shared_ptr location_cache_; - std::shared_ptr rpc_client_; + std::shared_ptr async_connection_; std::shared_ptr conf_; - bool is_closed_ = false; - // default 5 retries. over-ridden in constructor. - int client_retries_ = 5; + std::unique_ptr async_table_; + + private: + milliseconds operation_timeout() const; }; } /* namespace hbase */ diff --git hbase-native-client/security/user.h hbase-native-client/security/user.h index 372ab44..035af31 100644 --- hbase-native-client/security/user.h +++ hbase-native-client/security/user.h @@ -37,6 +37,7 @@ class User { static bool IsSecurityEnabled(const Configuration& conf) { return conf.Get("hbase.security.authentication", "").compare(kKerberos) == 0; } + private: std::string user_name_; }; diff --git hbase-native-client/utils/time-util.h hbase-native-client/utils/time-util.h index bbc3b35..183260b 100644 --- hbase-native-client/utils/time-util.h +++ hbase-native-client/utils/time-util.h @@ -23,28 +23,45 @@ #include using std::chrono::nanoseconds; using std::chrono::milliseconds; +using std::chrono::seconds; namespace hbase { class TimeUtil { public: - static int64_t ToMillis(const int64_t& nanos) { + static inline int64_t ToMillis(const int64_t& nanos) { return std::chrono::duration_cast(nanoseconds(nanos)).count(); } - static std::string ToMillisStr(const nanoseconds& nanos) { + static inline milliseconds ToMillis(const nanoseconds& nanos) { + return std::chrono::duration_cast(nanoseconds(nanos)); + } + + static inline nanoseconds ToNanos(const milliseconds& millis) { + return std::chrono::duration_cast(millis); + } + + static inline nanoseconds MillisToNanos(const int64_t& millis) { + return std::chrono::duration_cast(milliseconds(millis)); + } + + static inline nanoseconds SecondsToNanos(const int64_t& secs) { + return std::chrono::duration_cast(seconds(secs)); + } + + static inline std::string ToMillisStr(const nanoseconds& nanos) { return std::to_string(std::chrono::duration_cast(nanos).count()); } - static int64_t GetNowNanos() { + static inline int64_t GetNowNanos() { auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); return std::chrono::duration_cast(duration).count(); } - static int64_t ElapsedMillis(const int64_t& start_ns) { + static inline int64_t ElapsedMillis(const int64_t& start_ns) { return std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns)).count(); } - static std::string ElapsedMillisStr(const int64_t& start_ns) { + static inline std::string ElapsedMillisStr(const int64_t& start_ns) { return std::to_string( std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns)).count()); }