diff --git hbase-native-client/connection/client-dispatcher.cc hbase-native-client/connection/client-dispatcher.cc index b9b2c34..3d3770a 100644 --- hbase-native-client/connection/client-dispatcher.cc +++ hbase-native-client/connection/client-dispatcher.cc @@ -17,17 +17,23 @@ * */ #include "connection/client-dispatcher.h" + #include #include +#include "connection/rpc-connection.h" +#include "exceptions/exception.h" + using std::unique_ptr; namespace hbase { -ClientDispatcher::ClientDispatcher() : current_call_id_(9), requests_(5000) {} +ClientDispatcher::ClientDispatcher(const std::string &server) + : current_call_id_(9), requests_(5000), server_(server), rpc_connection_(nullptr) {} void ClientDispatcher::read(Context *ctx, unique_ptr in) { + VLOG(5) << "ClientDispatcher::read()"; auto call_id = in->call_id(); auto p = requests_.find_and_erase(call_id); @@ -38,7 +44,18 @@ void ClientDispatcher::read(Context *ctx, unique_ptr in) { } } +void ClientDispatcher::readException(Context *ctx, folly::exception_wrapper e) { + VLOG(5) << "ClientDispatcher::readException()"; + CloseAndCleanUpCalls(); +} + +void ClientDispatcher::readEOF(Context *ctx) { + VLOG(5) << "ClientDispatcher::readEOF()"; + CloseAndCleanUpCalls(); +} + folly::Future> ClientDispatcher::operator()(unique_ptr arg) { + VLOG(5) << "ClientDispatcher::operator()"; auto call_id = current_call_id_++; arg->set_call_id(call_id); @@ -50,15 +67,34 @@ folly::Future> ClientDispatcher::operator()(unique_ptrClose(); + } +} + +folly::Future ClientDispatcher::close() { + CloseAndCleanUpCalls(); + return ClientDispatcherBase::close(); +} folly::Future ClientDispatcher::close(Context *ctx) { + CloseAndCleanUpCalls(); return ClientDispatcherBase::close(ctx); } } // namespace hbase diff --git hbase-native-client/connection/client-dispatcher.h hbase-native-client/connection/client-dispatcher.h index 1f8e6b3..15d895e 100644 --- hbase-native-client/connection/client-dispatcher.h +++ hbase-native-client/connection/client-dispatcher.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "connection/pipeline.h" #include "connection/request.h" @@ -33,6 +34,9 @@ #include "utils/concurrent-map.h" namespace hbase { + +class RpcConnection; + /** * Dispatcher that assigns a call_id and then routes the response back to the * future. @@ -42,9 +46,11 @@ class ClientDispatcher std::unique_ptr> { public: /** Create a new ClientDispatcher */ - ClientDispatcher(); + explicit ClientDispatcher(const std::string &server); /** Read a response off the pipeline. */ void read(Context *ctx, std::unique_ptr in) override; + void readException(Context *ctx, folly::exception_wrapper e) override; + void readEOF(Context *ctx) override; /** Take a request as a call and send it down the pipeline. */ folly::Future> operator()(std::unique_ptr arg) override; /** Close the dispatcher and the associated pipeline. */ @@ -52,6 +58,13 @@ class ClientDispatcher /** Close the dispatcher and the associated pipeline. */ folly::Future close() override; + void set_rpc_connection(std::shared_ptr rpc_connection) { + rpc_connection_ = rpc_connection; + } + + private: + void CloseAndCleanUpCalls(); + private: concurrent_map>> requests_; // Start at some number way above what could @@ -63,5 +76,7 @@ class ClientDispatcher // uint32_t has a max of 4Billion so 10 more or less is // not a big deal. std::atomic current_call_id_; + std::string server_; + std::shared_ptr rpc_connection_; }; } // namespace hbase diff --git hbase-native-client/connection/connection-factory.cc hbase-native-client/connection/connection-factory.cc index a0c7f96..e77e8c4 100644 --- hbase-native-client/connection/connection-factory.cc +++ hbase-native-client/connection/connection-factory.cc @@ -17,6 +17,7 @@ * */ +#include #include #include @@ -33,18 +34,20 @@ using std::chrono::nanoseconds; namespace hbase { -ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool, +ConnectionFactory::ConnectionFactory(std::shared_ptr io_executor, + std::shared_ptr cpu_executor, std::shared_ptr codec, std::shared_ptr conf, nanoseconds connect_timeout) : connect_timeout_(connect_timeout), - io_pool_(io_pool), + io_executor_(io_executor), + cpu_executor_(cpu_executor), conf_(conf), pipeline_factory_(std::make_shared(codec, conf)) {} std::shared_ptr> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared>(); - client->group(io_pool_); + client->group(io_executor_); client->pipelineFactory(pipeline_factory_); // TODO: Opened https://github.com/facebook/wangle/issues/85 in wangle so that we can set socket @@ -54,17 +57,29 @@ std::shared_ptr> ConnectionFactory::M } std::shared_ptr ConnectionFactory::Connect( - std::shared_ptr> client, const std::string &hostname, - uint16_t port) { - // Yes this will block however it makes dealing with connection pool soooooo - // much nicer. - // TODO see about using shared promise for this. - auto pipeline = client - ->connect(folly::SocketAddress(hostname, port, true), - std::chrono::duration_cast(connect_timeout_)) - .get(); - auto dispatcher = std::make_shared(); + std::shared_ptr rpc_connection, + std::shared_ptr> client_bootstrap, + const std::string &hostname, uint16_t port) { + // connection should happen from an IO thread + auto future = via(io_executor_.get()) + .then([=]() { + VLOG(1) << "Connecting to server: " << hostname << ":" << port; + return client_bootstrap->connect( + folly::SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)); + }) + .via(cpu_executor_.get()) // blocking call must not block the IO thread, + // otherwise it is a deadlock + .wait(); // now block on the connection establishment + + // See about using shared promise for this. + auto pipeline = future.get(); + + VLOG(1) << "Connected to server: " << hostname << ":" << port; + auto dispatcher = + std::make_shared(hostname + ":" + folly::to(port)); dispatcher->setPipeline(pipeline); + dispatcher->set_rpc_connection(rpc_connection); return dispatcher; } } // namespace hbase diff --git hbase-native-client/connection/connection-factory.h hbase-native-client/connection/connection-factory.h index c96087d..c4e63c2 100644 --- hbase-native-client/connection/connection-factory.h +++ hbase-native-client/connection/connection-factory.h @@ -18,6 +18,8 @@ */ #pragma once +#include +#include #include #include @@ -32,6 +34,8 @@ namespace hbase { +class RpcConnection; + /** * Class to create a ClientBootstrap and turn it into a connected * pipeline. @@ -42,7 +46,8 @@ class ConnectionFactory { * Constructor. * There should only be one ConnectionFactory per client. */ - ConnectionFactory(std::shared_ptr io_pool, + ConnectionFactory(std::shared_ptr io_executor, + std::shared_ptr cpu_executor, std::shared_ptr codec, std::shared_ptr conf, std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0)); @@ -60,13 +65,19 @@ class ConnectionFactory { * This is mostly visible so that mocks can override socket connections. */ virtual std::shared_ptr Connect( - std::shared_ptr> client, + std::shared_ptr rpc_connection, + std::shared_ptr> client_bootstrap, const std::string &hostname, uint16_t port); + std::shared_ptr io_executor() { return io_executor_; } + + std::shared_ptr cpu_executor() { return cpu_executor_; } + private: std::chrono::nanoseconds connect_timeout_; std::shared_ptr conf_; - std::shared_ptr io_pool_; + std::shared_ptr io_executor_; + std::shared_ptr cpu_executor_; std::shared_ptr pipeline_factory_; }; } // namespace hbase diff --git hbase-native-client/connection/connection-id.h hbase-native-client/connection/connection-id.h index 4f84bf8..065b484 100644 --- hbase-native-client/connection/connection-id.h +++ hbase-native-client/connection/connection-id.h @@ -18,13 +18,15 @@ */ #pragma once -#include "if/HBase.pb.h" -#include "security/user.h" - #include + #include +#include #include +#include "if/HBase.pb.h" +#include "security/user.h" + namespace hbase { class ConnectionId { diff --git hbase-native-client/connection/connection-pool-test.cc hbase-native-client/connection/connection-pool-test.cc index 63f774b..c0aa4ae 100644 --- hbase-native-client/connection/connection-pool-test.cc +++ hbase-native-client/connection/connection-pool-test.cc @@ -17,26 +17,29 @@ * */ +#include +#include + #include "connection/connection-pool.h" #include "connection/connection-factory.h" #include "connection/connection-id.h" - #include "if/HBase.pb.h" #include "serde/server-name.h" -#include -#include - -using namespace hbase; - using hbase::pb::ServerName; using ::testing::Return; using ::testing::_; +using hbase::ConnectionFactory; +using hbase::ConnectionPool; using hbase::ConnectionId; +using hbase::HBaseService; +using hbase::Request; +using hbase::Response; +using hbase::SerializePipeline; class MockConnectionFactory : public ConnectionFactory { public: - MockConnectionFactory() : ConnectionFactory(nullptr, nullptr, nullptr) {} + MockConnectionFactory() : ConnectionFactory(nullptr, nullptr, nullptr, nullptr) {} MOCK_METHOD0(MakeBootstrap, std::shared_ptr>()); MOCK_METHOD3(Connect, std::shared_ptr( std::shared_ptr>, diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index e98759d..7e51579 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -24,6 +24,7 @@ #include #include +#include #include using std::chrono::nanoseconds; @@ -31,15 +32,16 @@ using std::chrono::nanoseconds; namespace hbase { ConnectionPool::ConnectionPool(std::shared_ptr io_executor, + std::shared_ptr cpu_executor, std::shared_ptr codec, std::shared_ptr conf, nanoseconds connect_timeout) - : cf_(std::make_shared(io_executor, codec, conf, connect_timeout)), - clients_(), + : cf_(std::make_shared(io_executor, cpu_executor, codec, conf, + connect_timeout)), connections_(), map_mutex_(), conf_(conf) {} ConnectionPool::ConnectionPool(std::shared_ptr cf) - : cf_(cf), clients_(), connections_(), map_mutex_() {} + : cf_(cf), connections_(), map_mutex_() {} ConnectionPool::~ConnectionPool() { Close(); } @@ -85,12 +87,9 @@ std::shared_ptr ConnectionPool::GetNewConnection( connections_.erase(remote_id); /* create new connection */ - auto clientBootstrap = cf_->MakeBootstrap(); - auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); - auto connection = std::make_shared(remote_id, dispatcher); + auto connection = std::make_shared(remote_id, cf_); connections_.insert(std::make_pair(remote_id, connection)); - clients_.insert(std::make_pair(remote_id, clientBootstrap)); return connection; } @@ -107,7 +106,6 @@ void ConnectionPool::Close(std::shared_ptr remote_id) { } found->second->Close(); connections_.erase(found); - // TODO: erase the client as well? } void ConnectionPool::Close() { @@ -117,6 +115,5 @@ void ConnectionPool::Close() { con->Close(); } connections_.clear(); - clients_.clear(); } } // namespace hbase diff --git hbase-native-client/connection/connection-pool.h hbase-native-client/connection/connection-pool.h index c7c4246..9af1e7f 100644 --- hbase-native-client/connection/connection-pool.h +++ hbase-native-client/connection/connection-pool.h @@ -43,6 +43,7 @@ class ConnectionPool { public: /** Create connection pool wit default connection factory */ ConnectionPool(std::shared_ptr io_executor, + std::shared_ptr cpu_executor, std::shared_ptr codec, std::shared_ptr conf, std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0)); @@ -81,10 +82,6 @@ class ConnectionPool { std::unordered_map, std::shared_ptr, ConnectionIdHash, ConnectionIdEquals> connections_; - std::unordered_map, - std::shared_ptr>, ConnectionIdHash, - ConnectionIdEquals> - clients_; folly::SharedMutexWritePriority map_mutex_; std::shared_ptr cf_; std::shared_ptr conf_; diff --git hbase-native-client/connection/rpc-client.cc hbase-native-client/connection/rpc-client.cc index 10faa7a..a2cd8a3 100644 --- hbase-native-client/connection/rpc-client.cc +++ hbase-native-client/connection/rpc-client.cc @@ -29,10 +29,11 @@ using std::chrono::nanoseconds; namespace hbase { RpcClient::RpcClient(std::shared_ptr io_executor, + std::shared_ptr cpu_executor, std::shared_ptr codec, std::shared_ptr conf, nanoseconds connect_timeout) : io_executor_(io_executor), conf_(conf) { - cp_ = std::make_shared(io_executor_, codec, conf, connect_timeout); + cp_ = std::make_shared(io_executor_, cpu_executor, codec, conf, connect_timeout); } void RpcClient::Close() { io_executor_->stop(); } diff --git hbase-native-client/connection/rpc-client.h hbase-native-client/connection/rpc-client.h index 0ecde5b..debf41c 100644 --- hbase-native-client/connection/rpc-client.h +++ hbase-native-client/connection/rpc-client.h @@ -35,8 +35,9 @@ namespace hbase { class RpcClient { public: - RpcClient(std::shared_ptr io_executor, std::shared_ptr codec, - std::shared_ptr conf, + RpcClient(std::shared_ptr io_executor, + std::shared_ptr cpu_executor, + std::shared_ptr codec, std::shared_ptr conf, std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0)); virtual ~RpcClient() { Close(); } diff --git hbase-native-client/connection/rpc-connection.h hbase-native-client/connection/rpc-connection.h index d9966a1..ba23df2 100644 --- hbase-native-client/connection/rpc-connection.h +++ hbase-native-client/connection/rpc-connection.h @@ -18,36 +18,60 @@ */ #pragma once +#include +#include + +#include "connection/connection-factory.h" #include "connection/connection-id.h" #include "connection/request.h" #include "connection/response.h" #include "connection/service.h" -#include -#include - namespace hbase { -class RpcConnection { +class RpcConnection : public std::enable_shared_from_this { public: - RpcConnection(std::shared_ptr connection_id, - std::shared_ptr hbase_service) - : connection_id_(connection_id), hbase_service_(hbase_service) {} + RpcConnection(std::shared_ptr connection_id, std::shared_ptr cf) + : connection_id_(connection_id), cf_(cf), hbase_service_(nullptr) {} virtual ~RpcConnection() { Close(); } virtual std::shared_ptr remote_id() const { return connection_id_; } - virtual std::shared_ptr get_service() const { return hbase_service_; } - virtual folly::Future> SendRequest(std::unique_ptr req) { + if (hbase_service_ == nullptr) { + Connect(); + } + VLOG(5) << "Calling RpcConnection::SendRequest()"; // TODO return (*hbase_service_)(std::move(req)); } - virtual void Close() { hbase_service_->close(); } + virtual void Close() { + if (hbase_service_) { + hbase_service_->close(); + hbase_service_ = nullptr; + } + if (client_bootstrap_) { + client_bootstrap_ = nullptr; + } + } + + private: + void Connect() { + // TODO: acquire mutex lock + client_bootstrap_ = cf_->MakeBootstrap(); + auto dispatcher = cf_->Connect(shared_from_this(), client_bootstrap_, remote_id()->host(), + remote_id()->port()); + hbase_service_ = std::move(dispatcher); + } private: + std::mutex mutex_; + std::shared_ptr io_executor_; + std::shared_ptr cpu_executor_; std::shared_ptr connection_id_; std::shared_ptr hbase_service_; + std::shared_ptr cf_; + std::shared_ptr> client_bootstrap_; }; } // namespace hbase diff --git hbase-native-client/connection/sasl-handler.cc hbase-native-client/connection/sasl-handler.cc index ea09595..9afe1e2 100644 --- hbase-native-client/connection/sasl-handler.cc +++ hbase-native-client/connection/sasl-handler.cc @@ -86,6 +86,7 @@ void SaslHandler::transportActive(Context *ctx) { VLOG(3) << "Writing RPC connection Preamble to server: " << host_name_; auto preamble = RpcSerde::Preamble(secure_); ctx->fireWrite(std::move(preamble)); + ctx->fireTransportActive(); } void SaslHandler::read(Context *ctx, folly::IOBufQueue &buf) { diff --git hbase-native-client/core/async-batch-rpc-retrying-test.cc hbase-native-client/core/async-batch-rpc-retrying-test.cc index c186276..a959011 100644 --- hbase-native-client/core/async-batch-rpc-retrying-test.cc +++ hbase-native-client/core/async-batch-rpc-retrying-test.cc @@ -318,7 +318,7 @@ void runMultiTest(std::shared_ptr region_locator, auto retry_executor_ = std::make_shared(1); auto codec = std::make_shared(); auto rpc_client = - std::make_shared(io_executor_, codec, AsyncBatchRpcRetryTest::test_util->conf()); + std::make_shared(io_executor_, cpu_executor_, codec, AsyncBatchRpcRetryTest::test_util->conf()); std::shared_ptr retry_timer = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); diff --git hbase-native-client/core/async-connection.cc hbase-native-client/core/async-connection.cc index ef945fb..850fb8f 100644 --- hbase-native-client/core/async-connection.cc +++ hbase-native-client/core/async-connection.cc @@ -44,10 +44,10 @@ void AsyncConnectionImpl::Init() { } else { LOG(WARNING) << "Not using RPC Cell Codec"; } - rpc_client_ = std::make_shared(io_executor_, codec, conf_, + rpc_client_ = std::make_shared(io_executor_, cpu_executor_, codec, conf_, connection_conf_->connect_timeout()); - location_cache_ = - std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); + location_cache_ = std::make_shared(conf_, io_executor_, cpu_executor_, + rpc_client_->connection_pool()); caller_factory_ = std::make_shared(shared_from_this(), retry_timer_); } diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index f887815..cdd20f0 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -317,8 +317,8 @@ void runTest(std::shared_ptr region_locator, std::string auto io_executor_ = client.async_connection()->io_executor(); auto retry_executor_ = std::make_shared(1); auto codec = std::make_shared(); - auto rpc_client = - std::make_shared(io_executor_, codec, AsyncRpcRetryTest::test_util->conf()); + auto rpc_client = std::make_shared(io_executor_, cpu_executor_, codec, + AsyncRpcRetryTest::test_util->conf()); // auto retry_event_base_ = std::make_shared(true); std::shared_ptr retry_timer = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); diff --git hbase-native-client/core/location-cache-retry-test.cc hbase-native-client/core/location-cache-retry-test.cc index 988f994..f154b69 100644 --- hbase-native-client/core/location-cache-retry-test.cc +++ hbase-native-client/core/location-cache-retry-test.cc @@ -45,7 +45,7 @@ using hbase::Put; using hbase::Table; using hbase::TestUtil; -using std::chrono_literals::operator""s; +using std::chrono_literals::operator"" s; class LocationCacheRetryTest : public ::testing::Test { public: diff --git hbase-native-client/core/location-cache-test.cc hbase-native-client/core/location-cache-test.cc index 3253c56..7456d3a 100644 --- hbase-native-client/core/location-cache-test.cc +++ hbase-native-client/core/location-cache-test.cc @@ -27,8 +27,15 @@ #include "if/HBase.pb.h" #include "serde/table-name.h" #include "test-util/test-util.h" -using namespace hbase; -using namespace std::chrono; + +using hbase::Cell; +using hbase::Configuration; +using hbase::ConnectionPool; +using hbase::MetaUtil; +using hbase::LocationCache; +using hbase::TestUtil; +using hbase::KeyValueCodec; +using std::chrono::milliseconds; class LocationCacheTest : public ::testing::Test { protected: @@ -52,8 +59,8 @@ TEST_F(LocationCacheTest, TestGetMetaNodeContents) { auto cpu = std::make_shared(4); auto io = std::make_shared(4); auto codec = std::make_shared(); - auto cp = std::make_shared(io, codec, LocationCacheTest::test_util_->conf()); - LocationCache cache{LocationCacheTest::test_util_->conf(), cpu, cp}; + auto cp = std::make_shared(io, cpu, codec, LocationCacheTest::test_util_->conf()); + LocationCache cache{LocationCacheTest::test_util_->conf(), io, cpu, cp}; auto f = cache.LocateMeta(); auto result = f.get(); ASSERT_FALSE(f.hasException()); @@ -68,8 +75,8 @@ TEST_F(LocationCacheTest, TestGetRegionLocation) { auto cpu = std::make_shared(4); auto io = std::make_shared(4); auto codec = std::make_shared(); - auto cp = std::make_shared(io, codec, LocationCacheTest::test_util_->conf()); - LocationCache cache{LocationCacheTest::test_util_->conf(), cpu, cp}; + auto cp = std::make_shared(io, cpu, codec, LocationCacheTest::test_util_->conf()); + LocationCache cache{LocationCacheTest::test_util_->conf(), io, cpu, cp}; // If there is no table this should throw an exception auto tn = folly::to("t"); @@ -87,8 +94,8 @@ TEST_F(LocationCacheTest, TestCaching) { auto cpu = std::make_shared(4); auto io = std::make_shared(4); auto codec = std::make_shared(); - auto cp = std::make_shared(io, codec, LocationCacheTest::test_util_->conf()); - LocationCache cache{LocationCacheTest::test_util_->conf(), cpu, cp}; + auto cp = std::make_shared(io, cpu, codec, LocationCacheTest::test_util_->conf()); + LocationCache cache{LocationCacheTest::test_util_->conf(), io, cpu, cp}; auto tn_1 = folly::to("t1"); auto tn_2 = folly::to("t2"); diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index 5f68420..e377266 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include "connection/response.h" @@ -44,13 +45,15 @@ using hbase::pb::TableName; namespace hbase { LocationCache::LocationCache(std::shared_ptr conf, + std::shared_ptr io_executor, std::shared_ptr cpu_executor, std::shared_ptr cp) : conf_(conf), + io_executor_(io_executor), cpu_executor_(cpu_executor), + cp_(cp), meta_promise_(nullptr), meta_lock_(), - cp_(cp), meta_util_(), zk_(nullptr), cached_locations_(), @@ -141,11 +144,12 @@ folly::Future> LocationCache::LocateFromMeta( return this->LocateMeta() .via(cpu_executor_.get()) .then([this](ServerName sn) { + // TODO: use RpcClient? auto remote_id = std::make_shared(sn.host_name(), sn.port()); return this->cp_->GetConnection(remote_id); }) .then([tn, row, this](std::shared_ptr rpc_connection) { - return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row))); + return rpc_connection->SendRequest(std::move(meta_util_.MetaRequest(tn, row))); }) .onError([&](const folly::exception_wrapper &ew) { auto promise = InvalidateMeta(); diff --git hbase-native-client/core/location-cache.h hbase-native-client/core/location-cache.h index a374fb6..084c2d8 100644 --- hbase-native-client/core/location-cache.h +++ hbase-native-client/core/location-cache.h @@ -27,18 +27,19 @@ #include #include +#include #include #include -#include #include +#include #include "connection/connection-pool.h" #include "core/async-region-locator.h" #include "core/configuration.h" #include "core/meta-utils.h" #include "core/region-location.h" +#include "core/zk-util.h" #include "serde/table-name.h" -#include "zk-util.h" namespace hbase { // Forward @@ -87,6 +88,7 @@ class LocationCache : public AsyncRegionLocator { * @param io_executor executor used to talk to the network */ LocationCache(std::shared_ptr conf, + std::shared_ptr io_executor, std::shared_ptr cpu_executor, std::shared_ptr cp); /** @@ -129,7 +131,7 @@ class LocationCache : public AsyncRegionLocator { * @param row of the table to look up. This object must live until after the * future is returned */ - virtual folly::Future> LocateRegion( + folly::Future> LocateRegion( const hbase::pb::TableName &tn, const std::string &row, const RegionLocateType locate_type = RegionLocateType::kCurrent, const int64_t locate_ns = 0) override; @@ -180,7 +182,7 @@ class LocationCache : public AsyncRegionLocator { /** * Update cached region location, possibly using the information from exception. */ - virtual void UpdateCachedLocation(const RegionLocation &loc, + void UpdateCachedLocation(const RegionLocation &loc, const folly::exception_wrapper &error) override; const std::string &zk_quorum() { return zk_quorum_; } @@ -200,6 +202,7 @@ class LocationCache : public AsyncRegionLocator { /* data */ std::shared_ptr conf_; std::string zk_quorum_; + std::shared_ptr io_executor_; std::shared_ptr cpu_executor_; std::unique_ptr> meta_promise_; std::recursive_mutex meta_lock_; diff --git hbase-native-client/core/region-location.h hbase-native-client/core/region-location.h index 822180b..f73999f 100644 --- hbase-native-client/core/region-location.h +++ hbase-native-client/core/region-location.h @@ -21,7 +21,6 @@ #include #include -#include "connection/service.h" #include "if/HBase.pb.h" namespace hbase { @@ -32,7 +31,7 @@ enum class RegionLocateType { kBefore, kCurrent, kAfter }; * @brief class to hold where a region is located. * * This class holds where a region is located, the information about it, the - * region name, and a connection to the service used for connecting to it. + * region name. */ class RegionLocation { public: @@ -42,7 +41,6 @@ class RegionLocation { * @param ri The decoded RegionInfo of this region. * @param sn The server name of the HBase regionserver thought to be hosting * this region. - * @param service the connected service to the regionserver. */ RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn) : region_name_(region_name), ri_(ri), sn_(sn) {} diff --git hbase-native-client/utils/concurrent-map.h hbase-native-client/utils/concurrent-map.h index d9703e1..aebca0d 100644 --- hbase-native-client/utils/concurrent-map.h +++ hbase-native-client/utils/concurrent-map.h @@ -118,6 +118,11 @@ class concurrent_map { return map_.empty(); } + void clear() { + std::unique_lock lock(mutex_); + map_.clear(); + } + private: std::shared_timed_mutex mutex_; std::unordered_map map_;