diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index 07518c5..15dd64e 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -41,15 +41,7 @@ ConnectionPool::ConnectionPool(std::shared_ptr io_ ConnectionPool::ConnectionPool(std::shared_ptr cf) : cf_(cf), clients_(), connections_(), map_mutex_() {} -ConnectionPool::~ConnectionPool() { - SharedMutexWritePriority::WriteHolder holder(map_mutex_); - for (auto &item : connections_) { - auto &con = item.second; - con->Close(); - } - connections_.clear(); - clients_.clear(); -} +ConnectionPool::~ConnectionPool() { Close(); } std::shared_ptr ConnectionPool::GetConnection( std::shared_ptr remote_id) { @@ -116,4 +108,12 @@ void ConnectionPool::Close(std::shared_ptr remote_id) { connections_.erase(found); } -void ConnectionPool::Close() {} +void ConnectionPool::Close() { + SharedMutexWritePriority::WriteHolder holder{map_mutex_}; + for (auto &item : connections_) { + auto &con = item.second; + con->Close(); + } + connections_.clear(); + clients_.clear(); +} diff --git hbase-native-client/connection/rpc-client.cc hbase-native-client/connection/rpc-client.cc index 9cfefb8..7621193 100644 --- hbase-native-client/connection/rpc-client.cc +++ hbase-native-client/connection/rpc-client.cc @@ -39,9 +39,8 @@ class RpcChannelImplementation : public AbstractRpcChannel { }; } // namespace hbase -RpcClient::RpcClient() { - io_executor_ = std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); - +RpcClient::RpcClient(std::shared_ptr io_executor) + : io_executor_(io_executor) { cp_ = std::make_shared(io_executor_); } diff --git hbase-native-client/connection/rpc-client.h hbase-native-client/connection/rpc-client.h index 407d588..aeb9b56 100644 --- hbase-native-client/connection/rpc-client.h +++ hbase-native-client/connection/rpc-client.h @@ -26,6 +26,8 @@ #include +#include + using hbase::security::User; using hbase::pb::ServerName; using hbase::Request; @@ -49,7 +51,7 @@ class RpcClient : public std::enable_shared_from_this { friend class RpcChannelImplementation; public: - RpcClient(); + RpcClient(std::shared_ptr io_executor); virtual ~RpcClient() { Close(); } @@ -77,6 +79,8 @@ class RpcClient : public std::enable_shared_from_this { std::shared_ptr ticket, int rpc_timeout); + std::shared_ptr connection_pool() const { return cp_; } + private: void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg, Message *resp_msg, Closure *done, const std::string &host, uint16_t port, diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index 6eb3d8f..dd568ce 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -33,15 +33,22 @@ Client::Client() { "hbase-site.xml is absent in the search path or problems in XML parsing"; throw std::runtime_error("Configuration object not present."); } - conf_ = std::make_shared(conf.value()); - auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); - location_cache_ = std::make_shared(zk_quorum, cpu_executor_, io_executor_); + init(conf.value()); } -Client::Client(const hbase::Configuration &conf) { +Client::Client(const hbase::Configuration &conf) { init(conf); } + +void Client::init(const hbase::Configuration &conf) { conf_ = std::make_shared(conf); auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); - location_cache_ = std::make_shared(zk_quorum, cpu_executor_, io_executor_); + + cpu_executor_ = + std::make_shared(4); // TODO: read num threads from conf + io_executor_ = std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); + + rpc_client_ = std::make_shared(io_executor_); + location_cache_ = std::make_shared(zk_quorum, cpu_executor_, + rpc_client_->connection_pool()); } // We can't have the threads continue running after everything is done diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index da71624..730981d 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -67,14 +67,13 @@ class Client { void Close(); private: + void init(const hbase::Configuration &conf); const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; - std::shared_ptr cpu_executor_ = - std::make_shared(4); - std::shared_ptr io_executor_ = - std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); + std::shared_ptr cpu_executor_; + std::shared_ptr io_executor_; std::shared_ptr location_cache_; - std::shared_ptr rpc_client_ = std::make_shared(); + std::shared_ptr rpc_client_; std::shared_ptr conf_; bool is_closed_ = false; }; diff --git hbase-native-client/core/location-cache-test.cc hbase-native-client/core/location-cache-test.cc index 53db6fc..42e7bb3 100644 --- hbase-native-client/core/location-cache-test.cc +++ hbase-native-client/core/location-cache-test.cc @@ -34,19 +34,24 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) { auto cpu = std::make_shared(4); auto io = std::make_shared(4); - LocationCache cache{"localhost:2181", cpu, io}; + auto cp = std::make_shared(io); + LocationCache cache{"localhost:2181", cpu, cp}; auto f = cache.LocateMeta(); auto result = f.get(); ASSERT_FALSE(f.hasException()); ASSERT_TRUE(result.has_port()); ASSERT_TRUE(result.has_host_name()); + cpu->stop(); + io->stop(); + cp->Close(); } TEST(LocationCacheTest, TestGetRegionLocation) { TestUtil test_util{}; auto cpu = std::make_shared(4); auto io = std::make_shared(4); - LocationCache cache{"localhost:2181", cpu, io}; + auto cp = std::make_shared(io); + LocationCache cache{"localhost:2181", cpu, cp}; // If there is no table this should throw an exception auto tn = folly::to("t"); @@ -57,4 +62,77 @@ TEST(LocationCacheTest, TestGetRegionLocation) { ASSERT_TRUE(loc != nullptr); cpu->stop(); io->stop(); + cp->Close(); +} + +TEST(LocationCacheTest, TestCaching) { + TestUtil test_util{}; + auto cpu = std::make_shared(4); + auto io = std::make_shared(4); + auto cp = std::make_shared(io); + LocationCache cache{"localhost:2181", cpu, cp}; + + auto tn_1 = folly::to("t1"); + auto tn_2 = folly::to("t2"); + auto tn_3 = folly::to("t3"); + auto row_a = "a"; + + // test location pulled from meta gets cached + ASSERT_ANY_THROW(cache.LocateRegion(tn_1, row_a).get(milliseconds(1000))); + ASSERT_ANY_THROW(cache.LocateFromMeta(tn_1, row_a).get(milliseconds(1000))); + + test_util.RunShellCmd("create 't1', 'd'"); + + ASSERT_FALSE(cache.IsLocationCached(tn_1, row_a)); + auto loc = cache.LocateRegion(tn_1, row_a).get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_1, row_a)); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_1, row_a)); + + // test with two regions + test_util.RunShellCmd("create 't2', 'd', SPLITS => ['b']"); + + ASSERT_FALSE(cache.IsLocationCached(tn_2, "a")); + loc = cache.LocateRegion(tn_2, "a").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_2, "a")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "a")); + + ASSERT_FALSE(cache.IsLocationCached(tn_2, "b")); + loc = cache.LocateRegion(tn_2, "b").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_2, "b")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "b")); + ASSERT_TRUE(cache.IsLocationCached(tn_2, "ba")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "ba")); + + // test with three regions + test_util.RunShellCmd("create 't3', 'd', SPLITS => ['b', 'c']"); + + ASSERT_FALSE(cache.IsLocationCached(tn_3, "c")); + ASSERT_FALSE(cache.IsLocationCached(tn_3, "ca")); + loc = cache.LocateRegion(tn_3, "ca").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "c")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "c")); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "ca")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ca")); + + ASSERT_FALSE(cache.IsLocationCached(tn_3, "b")); + loc = cache.LocateRegion(tn_3, "b").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "b")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "b")); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "ba")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ba")); + + // clear second region + cache.ClearCachedLocation(tn_3, "b"); + ASSERT_FALSE(cache.IsLocationCached(tn_3, "b")); + + ASSERT_FALSE(cache.IsLocationCached(tn_3, "a")); + loc = cache.LocateRegion(tn_3, "a").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "a")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "a")); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "abc")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "abc")); + + cpu->stop(); + io->stop(); + cp->Close(); } diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index 6c2a790..66f3eb7 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -55,14 +55,16 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server"; LocationCache::LocationCache(std::string quorum_spec, std::shared_ptr cpu_executor, - std::shared_ptr io_executor) + std::shared_ptr cp) : quorum_spec_(quorum_spec), cpu_executor_(cpu_executor), meta_promise_(nullptr), meta_lock_(), - cp_(io_executor), + cp_(cp), meta_util_(), - zk_(nullptr) { + zk_(nullptr), + cached_locations_(), + locations_lock_() { zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0); } @@ -121,7 +123,7 @@ Future> LocationCache::LocateFromMeta(const Tabl .via(cpu_executor_.get()) .then([this](ServerName sn) { auto remote_id = std::make_shared(sn.host_name(), sn.port()); - return this->cp_.GetConnection(remote_id); + return this->cp_->GetConnection(remote_id); }) .then([tn, row, this](std::shared_ptr rpc_connection) { return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row))); @@ -143,11 +145,27 @@ Future> LocationCache::LocateFromMeta(const Tabl auto remote_id = std::make_shared(rl->server_name().host_name(), rl->server_name().port()); // Now fill out the connection. - rl->set_service(cp_.GetConnection(remote_id)->get_service()); + // rl->set_service(cp_->GetConnection(remote_id)->get_service()); TODO: this causes wangle + // assertion errors + return rl; + }) + .then([tn, this](shared_ptr rl) { + // now add fetched location to the cache. + this->CacheLocation(tn, rl); return rl; }); } +Future> LocationCache::LocateRegion(const hbase::pb::TableName &tn, + const std::string &row) { + auto cached_loc = this->GetCachedLocation(tn, row); + if (cached_loc != nullptr) { + return cached_loc; + } else { + return this->LocateFromMeta(tn, row); + } +} + std::shared_ptr LocationCache::CreateLocation(const Response &resp) { auto resp_msg = static_pointer_cast(resp.resp_msg()); auto &results = resp_msg->results().Get(0); @@ -163,3 +181,117 @@ std::shared_ptr LocationCache::CreateLocation(const Response &re auto server_name = folly::to(cell_one); return std::make_shared(row, std::move(region_info), server_name, nullptr); } + +// must hold shared lock on locations_lock_ +shared_ptr LocationCache::GetCachedLocation(const hbase::pb::TableName &tn, + const std::string &row) { + auto t_locs = this->GetTableLocations(tn); + std::shared_lock lock(locations_lock_); + + if (VLOG_IS_ON(2)) { + for (const auto &p : *t_locs) { + VLOG(2) << "t_locs[" << p.first << "] = " << p.second->DebugString(); + } + } + + // looking for the "floor" key as a start key + auto possible_region = t_locs->upper_bound(row); + + if (t_locs->empty()) { + VLOG(2) << "Could not find region in cache, table map is empty"; + return nullptr; + } + + if (possible_region == t_locs->begin()) { + VLOG(2) << "Could not find region in cache, all keys are greater, row:" << row + << " ,possible_region:" << possible_region->second->DebugString(); + return nullptr; + } + --possible_region; + + VLOG(2) << "Found possible region in cache for row:" << row + << " ,possible_region:" << possible_region->second->DebugString(); + + // found possible start key, now need to check end key + if (possible_region->second->region_info().end_key() == "" || + possible_region->second->region_info().end_key() > row) { + VLOG(1) << "Found region in cache for row:" << row + << " ,region:" << possible_region->second->DebugString(); + return possible_region->second; + } else { + return nullptr; + } +} + +// must hold unique lock on locations_lock_ +void LocationCache::CacheLocation(const hbase::pb::TableName &tn, + const shared_ptr loc) { + auto t_locs = this->GetTableLocations(tn); + std::unique_lock lock(locations_lock_); + + (*t_locs)[loc->region_info().start_key()] = loc; + VLOG(1) << "Cached location for region:" << loc->DebugString(); +} + +// must hold shared lock on locations_lock_ +bool LocationCache::IsLocationCached(const hbase::pb::TableName &tn, const std::string &row) { + return (this->GetCachedLocation(tn, row) != nullptr); +} + +// shared lock needed for cases when this table has been requested before; +// in the rare case it hasn't, unique lock will be grabbed to add it to cache +shared_ptr LocationCache::GetTableLocations( + const hbase::pb::TableName &tn) { + auto found_locs = this->GetCachedTableLocations(tn); + if (found_locs == nullptr) { + found_locs = this->GetNewTableLocations(tn); + } + return found_locs; +} + +shared_ptr LocationCache::GetCachedTableLocations( + const hbase::pb::TableName &tn) { + SharedMutexWritePriority::ReadHolder r_holder{locations_lock_}; + + auto table_locs = cached_locations_.find(tn); + if (table_locs != cached_locations_.end()) { + return table_locs->second; + } else { + return nullptr; + } +} + +shared_ptr LocationCache::GetNewTableLocations( + const hbase::pb::TableName &tn) { + // double-check locking under upgradable lock + SharedMutexWritePriority::UpgradeHolder u_holder{locations_lock_}; + + auto table_locs = cached_locations_.find(tn); + if (table_locs != cached_locations_.end()) { + return table_locs->second; + } + SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; + + auto t_locs_p = make_shared>>(); + cached_locations_.insert(std::make_pair(tn, t_locs_p)); + return t_locs_p; +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCache() { + unique_lock lock(locations_lock_); + cached_locations_.clear(); +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) { + unique_lock lock(locations_lock_); + cached_locations_.erase(tn); +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row) { + auto table_locs = this->GetTableLocations(tn); + unique_lock lock(locations_lock_); + table_locs->erase(row); +} diff --git hbase-native-client/core/location-cache.h hbase-native-client/core/location-cache.h index b290a1f..22a8ad5 100644 --- hbase-native-client/core/location-cache.h +++ hbase-native-client/core/location-cache.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include +#include #include #include "connection/connection-pool.h" @@ -40,8 +42,34 @@ class Request; class Response; namespace pb { class ServerName; +class TableName; } +/** Equals function for TableName (uses namespace and table name) */ +struct TableNameEquals { + /** equals */ + bool operator()(const hbase::pb::TableName &lht, const hbase::pb::TableName &rht) const { + return lht.namespace_() == rht.namespace_() && lht.qualifier() == rht.qualifier(); + } +}; + +/** Hash for TableName. */ +struct TableNameHash { + /** hash */ + std::size_t operator()(hbase::pb::TableName const &t) const { + std::size_t h = 0; + boost::hash_combine(h, t.namespace_()); + boost::hash_combine(h, t.qualifier()); + return h; + } +}; + +// typedefs for location cache +typedef std::map> PerTableLocationMap; +typedef std::unordered_map, + TableNameHash, TableNameEquals> + RegionLocationMap; + /** * Class that can look up and cache locations. */ @@ -56,7 +84,7 @@ class LocationCache { */ LocationCache(std::string quorum_spec, std::shared_ptr cpu_executor, - std::shared_ptr io_executor); + std::shared_ptr cp); /** * Destructor. * This will clean up the zookeeper connections. @@ -71,7 +99,8 @@ class LocationCache { folly::Future LocateMeta(); /** - * Go read meta and find out where a region is located. + * Go read meta and find out where a region is located. Most users should + * never call this method directly and should use LocateRegion() instead. * * @param tn Table name of the table to look up. This object must live until * after the future is returned @@ -83,14 +112,72 @@ class LocationCache { const std::string &row); /** + * The only method clients should use for meta lookups. If corresponding + * location is cached, it's returned from the cache, otherwise lookup + * in meta table is done, location is cached and then returned. + * It's expected that tiny fraction of invocations incurs meta scan. + * This method is to look up non-meta regions; use LocateMeta() to get the + * location of hbase:meta region. + * + * @param tn Table name of the table to look up. This object must live until + * after the future is returned + * + * @param row of the table to look up. This object must live until after the + * future is returned + */ + folly::Future> LocateRegion(const hbase::pb::TableName &tn, + const std::string &row); + + /** * Remove the cached location of meta. */ void InvalidateMeta(); + /** + * Return cached region location corresponding to this row, + * nullptr if this location isn't cached. + */ + std::shared_ptr GetCachedLocation(const hbase::pb::TableName &tn, + const std::string &row); + + /** + * Add non-meta region location in the cache (location of meta itself + * is cached separately). + */ + void CacheLocation(const hbase::pb::TableName &tn, const std::shared_ptr loc); + + /** + * Check if location corresponding to this row key is cached. + */ + bool IsLocationCached(const hbase::pb::TableName &tn, const std::string &row); + + /** + * Return cached location for all region of this table. + */ + std::shared_ptr GetTableLocations(const hbase::pb::TableName &tn); + + /** + * Completely clear location cache. + */ + void ClearCache(); + + /** + * Clear all cached locations for one table. + */ + void ClearCachedLocations(const hbase::pb::TableName &tn); + + /** + * Clear cached region location. + */ + void ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row); + private: void RefreshMetaLocation(); hbase::pb::ServerName ReadMetaLocation(); std::shared_ptr CreateLocation(const Response &resp); + std::shared_ptr GetCachedTableLocations( + const hbase::pb::TableName &tn); + std::shared_ptr GetNewTableLocations(const hbase::pb::TableName &tn); /* data */ std::string quorum_spec_; @@ -98,7 +185,11 @@ class LocationCache { std::unique_ptr> meta_promise_; std::mutex meta_lock_; MetaUtil meta_util_; - ConnectionPool cp_; + std::shared_ptr cp_; + + // cached region locations + RegionLocationMap cached_locations_; + folly::SharedMutexWritePriority locations_lock_; // TODO: migrate this to a smart pointer with a deleter. zhandle_t *zk_; diff --git hbase-native-client/core/region-location.h hbase-native-client/core/region-location.h index e7b76d3..b0411cb 100644 --- hbase-native-client/core/region-location.h +++ hbase-native-client/core/region-location.h @@ -79,6 +79,10 @@ class RegionLocation { */ void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; } + const std::string DebugString() { + return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString(); + } + private: std::string region_name_; hbase::pb::RegionInfo ri_; diff --git hbase-native-client/core/simple-client.cc hbase-native-client/core/simple-client.cc index 90e7cd4..dac0d10 100644 --- hbase-native-client/core/simple-client.cc +++ hbase-native-client/core/simple-client.cc @@ -94,7 +94,7 @@ int main(int argc, char *argv[]) { auto row = FLAGS_row; auto tn = folly::to(FLAGS_table); - auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000)); + auto loc = cache.LocateRegion(tn, row).get(milliseconds(5000)); auto connection = loc->service(); auto num_puts = FLAGS_columns;