diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc index 53db6fc..eb9e878 100644 --- a/hbase-native-client/core/location-cache-test.cc +++ b/hbase-native-client/core/location-cache-test.cc @@ -58,3 +58,27 @@ TEST(LocationCacheTest, TestGetRegionLocation) { cpu->stop(); io->stop(); } + +TEST(LocationCacheTest, TestCaching) { + TestUtil test_util{}; + auto cpu = std::make_shared(4); + auto io = std::make_shared(4); + LocationCache cache{"localhost:2181", cpu, io}; + + auto tn = folly::to("t"); + auto row = "test"; + + // test location pulled from meta gets cached + ASSERT_EQ(nullptr, cache.LocateRegion(tn, row).get(milliseconds(1000))); + ASSERT_ANY_THROW(cache.LocateFromMeta(tn, row).get(milliseconds(1000))); + + test_util.RunShellCmd("create 't', 'd'"); + + ASSERT_FALSE(cache.IsLocationCached(tn, row)); + auto loc = cache.LocateRegion(tn, row).get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn, row)); + ASSERT_EQ(loc, cache.GetCachedLocation(tn, row)); + + cpu->stop(); + io->stop(); +} diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index efd2210..82b1185 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -33,6 +33,10 @@ using namespace std; using namespace folly; +using std::make_shared; +using std::unique_ptr; +using std::shared_ptr; +using folly::SharedMutexWritePriority; using wangle::ServiceFilter; using hbase::Request; using hbase::Response; @@ -51,11 +55,11 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server"; LocationCache::LocationCache( std::string quorum_spec, - std::shared_ptr cpu_executor, - std::shared_ptr io_executor) + shared_ptr cpu_executor, + shared_ptr io_executor) : quorum_spec_(quorum_spec), cpu_executor_(cpu_executor), meta_promise_(nullptr), meta_lock_(), cp_(io_executor), meta_util_(), - zk_(nullptr) { + zk_(nullptr), cached_locations_(), locations_lock_() { zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0); } @@ -111,12 +115,12 @@ ServerName LocationCache::ReadMetaLocation() { return mrs.server(); } -Future> +Future> LocationCache::LocateFromMeta(const TableName &tn, const string &row) { return this->LocateMeta() .via(cpu_executor_.get()) .then([this](ServerName sn) { return this->cp_.Get(sn); }) - .then([tn, row, this](std::shared_ptr service) { + .then([tn, row, this](shared_ptr service) { return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) .then([this](Response resp) { @@ -124,7 +128,7 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { // a region location. return this->CreateLocation(std::move(resp)); }) - .then([tn, this](std::shared_ptr rl) { + .then([tn, this](shared_ptr rl) { // Make sure that the correct location was found. if (rl->region_info().table_name().namespace_() != tn.namespace_() || rl->region_info().table_name().qualifier() != tn.qualifier()) { @@ -132,14 +136,29 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { } return rl; }) - .then([this](std::shared_ptr rl) { + .then([this](shared_ptr rl) { // Now fill out the connection. rl->set_service(cp_.Get(rl->server_name())); return rl; + }) + .then([tn, this](shared_ptr rl) { + // now add fetched location to the cache. + this->CacheLocation(tn, rl); + return rl; }); } -std::shared_ptr +Future> +LocationCache::LocateRegion(const hbase::pb::TableName &tn, const std::string &row) { + auto cached_loc = this->GetCachedLocation(tn, row); + if (cached_loc != nullptr) { + return cached_loc; + } else { + return this->LocateFromMeta(tn, row); + } +} + +shared_ptr LocationCache::CreateLocation(const Response &resp) { auto resp_msg = static_pointer_cast(resp.resp_msg()); auto &results = resp_msg->results().Get(0); @@ -153,6 +172,110 @@ LocationCache::CreateLocation(const Response &resp) { auto region_info = folly::to(cell_zero); auto server_name = folly::to(cell_one); - return std::make_shared(row, std::move(region_info), + return make_shared(row, std::move(region_info), server_name, nullptr); } + +// must hold shared lock on locations_lock_ +shared_ptr +LocationCache::GetCachedLocation(const hbase::pb::TableName &tn, + const std::string &row) { + auto t_locs = this->GetTableLocations(tn); + std::shared_lock lock(locations_lock_); + + // looking for the "floor" key as a start key + auto possible_region = t_locs->lower_bound(row); + if (possible_region == t_locs->begin()) { + if (possible_region->first != row) { + return nullptr; + } + } else if (possible_region->first != row) { + --possible_region; + } + + // found possible start key, now need to check end key + if (!possible_region->second->region_info().has_end_key() || + possible_region->second->region_info().end_key() > row) { + return possible_region->second; + } else { + return nullptr; + } +} + +// must hold unique lock on locations_lock_ +void LocationCache::CacheLocation(const hbase::pb::TableName &tn, + const shared_ptr loc) { + auto t_locs = this->GetTableLocations(tn); + std::unique_lock lock(locations_lock_); + + //TODO(mikhail): we may need to handle RegionMovedException + // to optimize meta lookups. + (*t_locs)[loc->region_info().start_key()] = loc; +} + +// must hold shared lock on locations_lock_ +bool LocationCache::IsLocationCached(const hbase::pb::TableName &tn, + const std::string &row) { + return (this->GetCachedLocation(tn, row) != nullptr); +} + +// shared lock needed for cases when this table has been requested before; +// in the rare case it hasn't, unique lock will be grabbed to add it to cache +shared_ptr LocationCache::GetTableLocations( + const hbase::pb::TableName &tn) { + auto found_locs = this->GetCachedTableLocations(tn); + if (found_locs == nullptr) { + found_locs = this->GetNewTableLocations(tn); + } + return found_locs; +} + +shared_ptr LocationCache::GetCachedTableLocations( + const hbase::pb::TableName &tn) { + SharedMutexWritePriority::ReadHolder r_holder{locations_lock_}; + + auto table_locs = cached_locations_.find(tn); + if (table_locs != cached_locations_.end()) { + return table_locs->second; + } else { + return nullptr; + } +} + +shared_ptr LocationCache::GetNewTableLocations( + const hbase::pb::TableName &tn) { + // double-check locking under upgradable lock + SharedMutexWritePriority::UpgradeHolder u_holder{locations_lock_}; + + auto table_locs = cached_locations_.find(tn); + if (table_locs != cached_locations_.end()) { + return table_locs->second; + } + SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; + + auto t_locs_p = make_shared< + map> + >(); + cached_locations_.insert(std::make_pair(tn, t_locs_p)); + return t_locs_p; +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCache() { + unique_lock lock(locations_lock_); + cached_locations_.clear(); +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) { + unique_lock lock(locations_lock_); + cached_locations_.erase(tn); +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, + const std::string &row) { + auto table_locs = this->GetTableLocations(tn); + unique_lock lock(locations_lock_); + table_locs->erase(row); +} diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index 830cd96..25438a7 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -21,12 +21,14 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include "connection/connection-pool.h" @@ -40,8 +42,42 @@ class Request; class Response; namespace pb { class ServerName; +class TableName; } +/** Equals function for TableName (uses namespace and table name) */ +struct TableNameEquals { + + /** equals */ + bool operator()(const hbase::pb::TableName &lht, + const hbase::pb::TableName &rht) const { + return lht.namespace_() == rht.namespace_() && + lht.qualifier() == rht.qualifier(); + } +}; + +/** Hash for TableName. */ +struct TableNameHash { + /** hash */ + std::size_t operator()(hbase::pb::TableName const &t) const { + std::size_t h = 0; + boost::hash_combine(h, t.namespace_()); + boost::hash_combine(h, t.qualifier()); + return h; + } +}; + +// typedefs for location cache +typedef std::map> + PerTableLocationMap; +typedef std::unordered_map< + hbase::pb::TableName, + std::shared_ptr, + TableNameHash, + TableNameEquals +> RegionLocationMap; + + /** * Class that can look up and cache locations. */ @@ -71,7 +107,8 @@ public: folly::Future LocateMeta(); /** - * Go read meta and find out where a region is located. + * Go read meta and find out where a region is located. Most users should + * never call this method directly and should use LocateRegion() instead. * * @param tn Table name of the table to look up. This object must live until * after the future is returned @@ -83,14 +120,77 @@ public: LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row); /** + * The only method clients should use for meta lookups. If corresponding + * location is cached, it's returned from the cache, otherwise lookup + * in meta table is done, location is cached and then returned. + * It's expected that tiny fraction of invocations incurs meta scan. + * This method is to look up non-meta regions; use LocateMeta() to get the + * location of hbase:meta region. + * + * @param tn Table name of the table to look up. This object must live until + * after the future is returned + * + * @param row of the table to look up. This object must live until after the + * future is returned + */ + folly::Future> + LocateRegion(const hbase::pb::TableName &tn, const std::string &row); + + /** * Remove the cached location of meta. */ void InvalidateMeta(); + /** + * Return cached region location corresponding to this row, + * nullptr if this location isn't cached. + */ + std::shared_ptr + GetCachedLocation(const hbase::pb::TableName &tn, const std::string &row); + + /** + * Add non-meta region location in the cache (location of meta itself + * is cached separately). + */ + void CacheLocation(const hbase::pb::TableName &tn, + const std::shared_ptr loc); + + /** + * Check if location corresponding to this row key is cached. + */ + bool + IsLocationCached(const hbase::pb::TableName &tn, const std::string &row); + + /** + * Return cached location for all region of this table. + */ + std::shared_ptr + GetTableLocations(const hbase::pb::TableName &tn); + + /** + * Completely clear location cache. + */ + void ClearCache(); + + /** + * Clear all cached locations for one table. + */ + void ClearCachedLocations(const hbase::pb::TableName &tn); + + /** + * Clear cached region location. + */ + void + ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row); + private: void RefreshMetaLocation(); hbase::pb::ServerName ReadMetaLocation(); std::shared_ptr CreateLocation(const Response &resp); + std::shared_ptr GetCachedTableLocations( + const hbase::pb::TableName &tn); + std::shared_ptr GetNewTableLocations( + const hbase::pb::TableName &tn); /* data */ std::string quorum_spec_; @@ -100,6 +200,10 @@ private: MetaUtil meta_util_; ConnectionPool cp_; + // cached region locations + RegionLocationMap cached_locations_; + folly::SharedMutexWritePriority locations_lock_; + // TODO: migrate this to a smart pointer with a deleter. zhandle_t *zk_; }; diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index f3f6c42..678cb5f 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -97,7 +97,7 @@ int main(int argc, char *argv[]) { auto row = FLAGS_row; auto tn = folly::to(FLAGS_table); - auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000)); + auto loc = cache.LocateRegion(tn, row).get(milliseconds(5000)); auto connection = loc->service(); auto num_puts = FLAGS_columns;