From 9ad7124fdad48f3d35ee4041d3cfdeeadd5d1fa8 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 4 May 2016 01:54:21 -0700 Subject: [PATCH] HBASE-15766 Show working puts --- .../connection/client-dispatcher.cc | 7 +-- hbase-native-client/connection/client-dispatcher.h | 5 +- hbase-native-client/connection/client-handler.cc | 13 +++-- hbase-native-client/connection/client-handler.h | 4 +- .../connection/connection-factory.cc | 1 + .../connection/connection-pool-test.cc | 26 +++++++++ hbase-native-client/connection/connection-pool.cc | 2 +- hbase-native-client/connection/connection-pool.h | 2 +- hbase-native-client/core/client.cc | 6 +-- hbase-native-client/core/location-cache.cc | 12 ++--- hbase-native-client/core/location-cache.h | 6 +-- hbase-native-client/core/meta-utils.cc | 6 +-- hbase-native-client/core/meta-utils.h | 4 +- hbase-native-client/core/region-location.h | 7 ++- hbase-native-client/core/simple-client.cc | 61 ++++++++++++++++++---- .../serde/region-info-deserializer-test.cc | 1 - hbase-native-client/serde/region-info.h | 6 +-- 17 files changed, 121 insertions(+), 48 deletions(-) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 6e2dc54..04a2ade 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -22,10 +22,11 @@ using namespace folly; using namespace hbase; using namespace wangle; -ClientDispatcher::ClientDispatcher() : requests_(), current_call_id_(9) {} +ClientDispatcher::ClientDispatcher() : requests_(1000), current_call_id_(9) {} void ClientDispatcher::read(Context *ctx, Response in) { auto call_id = in.call_id(); + auto search = requests_.find(call_id); CHECK(search != requests_.end()); auto p = std::move(search->second); @@ -39,9 +40,9 @@ void ClientDispatcher::read(Context *ctx, Response in) { Future ClientDispatcher::operator()(std::unique_ptr arg) { auto call_id = ++current_call_id_; - arg->set_call_id(call_id); - auto &p = requests_[call_id]; + requests_.insert(call_id, Promise{}); + auto &p = requests_.find(call_id)->second; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { LOG(ERROR) << "e = " << call_id; diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 826fc6a..d175d39 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -19,6 +19,8 @@ #pragma once +#include +#include #include #include "connection/pipeline.h" @@ -31,13 +33,14 @@ class ClientDispatcher std::unique_ptr, Response> { public: ClientDispatcher(); + ~ClientDispatcher() { LOG(ERROR) << "Killing ClientDispatcher call_id = " << current_call_id_; } void read(Context *ctx, Response in) override; folly::Future operator()(std::unique_ptr arg) override; folly::Future close(Context *ctx) override; folly::Future close() override; private: - std::unordered_map> requests_; + folly::AtomicHashMap> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 496e4f2..c4bfe5d 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,7 +37,10 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {} + : user_name_(user_name), need_send_header_(true), serde_(), + resp_msgs_( + make_unique>>(100)) {} void ClientHandler::read(Context *ctx, std::unique_ptr buf) { if (LIKELY(buf != nullptr)) { @@ -51,14 +54,14 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { << " has_exception=" << header.has_exception(); // Get the response protobuf from the map - auto search = resp_msgs_.find(header.call_id()); + auto search = resp_msgs_->find(header.call_id()); // It's an error if it's not there. - CHECK(search != resp_msgs_.end()); + CHECK(search != resp_msgs_->end()); auto resp_msg = search->second; CHECK(resp_msg != nullptr); // Make sure we don't leak the protobuf - resp_msgs_.erase(search); + resp_msgs_->erase(header.call_id()); // set the call_id. // This will be used to by the dispatcher to match up @@ -96,7 +99,7 @@ Future ClientHandler::write(Context *ctx, std::unique_ptr r) { ctx->fireWrite(std::move(pre)); } - resp_msgs_[r->call_id()] = r->resp_msg(); + resp_msgs_->insert(r->call_id(), r->resp_msg()); return ctx->fireWrite( serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index ce99c9e..be5143c 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -18,6 +18,7 @@ */ #pragma once +#include #include #include @@ -51,7 +52,8 @@ private: RpcSerde serde_; // in flight requests - std::unordered_map> + std::unique_ptr>> resp_msgs_; }; } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 9102d60..4ad54a8 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -41,6 +41,7 @@ using namespace hbase; using namespace wangle; ConnectionFactory::ConnectionFactory() : bootstrap_() { + LOG(ERROR) << "Creating a ConnectionFactory"; bootstrap_.group(std::make_shared(1)); bootstrap_.pipelineFactory(std::make_shared()); } diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index 975bc5e..46a7521 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -75,3 +75,29 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { ASSERT_TRUE(result != nullptr); result = cp.get(sn); } +TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { + std::string hostname_one{"hostname"}; + std::string hostname_two{"hostname_two"}; + uint32_t port{999}; + + auto mock_cf = std::make_shared(); + EXPECT_CALL((*mock_cf), make_connection(_, _)) + .Times(2) + .WillRepeatedly(Return(std::make_shared())); + + LOG(ERROR) << "Created ConnectionPool"; + + ServerName sn_one; + sn_one.set_host_name(hostname_one); + sn_one.set_port(port); + + ServerName sn_two; + sn_two.set_host_name(hostname_two); + sn_two.set_port(port); + + { + ConnectionPool cp{mock_cf}; + auto result_one = cp.get(sn_one); + auto result_two = cp.get(sn_two); + } +} diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index eafe60a..16703ba 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -41,7 +41,7 @@ std::shared_ptr ConnectionPool::get(const ServerName &sn) { if (found == connections_.end() || found->second == nullptr) { SharedMutexWritePriority::WriteHolder holder(std::move(holder)); auto new_con = cf_->make_connection(sn.host_name(), sn.port()); - connections_[sn] = new_con; + connections_.insert(std::make_pair(sn, new_con)); return new_con; } return found->second; diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index b8330e3..914007e 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -37,7 +37,7 @@ struct ServerNameHash { std::size_t operator()(hbase::pb::ServerName const &s) const { std::size_t h1 = std::hash()(s.host_name()); std::size_t h2 = std::hash()(s.port()); - return h1 ^ (h2 << 1); + return h1 ^ (h2 ); } }; diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 266c239..4b9f844 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -33,8 +33,4 @@ using namespace folly; using namespace std; using namespace hbase::pb; -namespace hbase { - -Client::Client(string quorum_spec) - : location_cache_(quorum_spec, wangle::getCPUExecutor()) {} -} +namespace hbase {} diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 2667f11..4f08ef5 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -25,8 +25,8 @@ #include "connection/response.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" -#include "serde/server-name.h" #include "serde/region-info.h" +#include "serde/server-name.h" #include "serde/zk.h" using namespace std; @@ -162,16 +162,12 @@ private: }; std::shared_ptr -LocationCache::CreateLocation(const Response &resp){ +LocationCache::CreateLocation(const Response &resp) { auto resp_msg = static_pointer_cast(resp.response()); auto &results = resp_msg->results().Get(0); auto &cells = results.cell(); - LOG(ERROR) << "resp_msg = " << resp_msg->DebugString(); auto ri = folly::to(cells.Get(0).value()); auto sn = folly::to(cells.Get(1).value()); - - LOG(ERROR) << "RegionInfo = " << ri.DebugString(); - LOG(ERROR) << "ServerName = " << sn.DebugString(); - auto wrapped = make_shared(cp_.get(sn), sn, this->cp_); - return std::make_shared(std::move(ri), std::move(sn), wrapped); + return std::make_shared(cells.Get(0).row(), std::move(ri), + sn, cp_.get(sn)); } diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index 99b5e5e..7f76428 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -48,9 +48,10 @@ public: // Meta Related Methods. // These are only public until testing is complete folly::Future LocateMeta(); - folly::Future> LocateFromMeta(const hbase::pb::TableName &tn, - const std::string &row); + folly::Future> + LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row); void InvalidateMeta(); + ConnectionPool cp_; private: void RefreshMetaLocation(); @@ -61,7 +62,6 @@ private: std::shared_ptr executor_; std::unique_ptr> meta_promise_; std::mutex meta_lock_; - ConnectionPool cp_; MetaUtil meta_util_; // TODO: migrate this to a smart pointer with a deleter. diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc index 1325d83..23d2041 100644 --- a/hbase-native-client/core/meta-utils.cc +++ b/hbase-native-client/core/meta-utils.cc @@ -37,12 +37,12 @@ using hbase::pb::RegionSpecifier_RegionSpecifierType; static const std::string META_REGION = "1588230740"; std::string MetaUtil::RegionLookupRowkey(const TableName &tn, - const std::string &row) const { + const std::string &row) const { return folly::to(tn, ",", row, ",", "999999999999999999"); } -std::unique_ptr -MetaUtil::MetaRequest(const TableName tn, const std::string &row) const { +std::unique_ptr MetaUtil::MetaRequest(const TableName tn, + const std::string &row) const { auto request = Request::scan(); auto msg = std::static_pointer_cast(request->req_msg()); diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h index 5a659f3..dfef065 100644 --- a/hbase-native-client/core/meta-utils.h +++ b/hbase-native-client/core/meta-utils.h @@ -29,8 +29,8 @@ namespace hbase { class MetaUtil { public: std::string RegionLookupRowkey(const hbase::pb::TableName &tn, - const std::string &row) const; + const std::string &row) const; std::unique_ptr MetaRequest(const hbase::pb::TableName tn, - const std::string &row) const; + const std::string &row) const; }; } // namespace hbase diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index 7922c95..553029c 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -27,15 +27,18 @@ namespace hbase { class RegionLocation { public: - RegionLocation(hbase::pb::RegionInfo ri, hbase::pb::ServerName sn, + RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, + hbase::pb::ServerName sn, std::shared_ptr service) - : ri_(ri), sn_(sn), service_(service) {} + : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {} const hbase::pb::RegionInfo ®ion_info() { return ri_; } const hbase::pb::ServerName &server_name() { return sn_; } + const std::string ®ion_name() { return region_name_; } std::shared_ptr service() { return service_; } private: + std::string region_name_; hbase::pb::RegionInfo ri_; hbase::pb::ServerName sn_; std::shared_ptr service_; diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 00e3369..a41f229 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include @@ -42,13 +44,35 @@ using hbase::ConnectionPool; using hbase::pb::TableName; using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; -using hbase::pb::GetRequest; -using hbase::pb::GetResponse; +using hbase::pb::MutateRequest; +using hbase::pb::MutationProto_MutationType; // TODO(eclark): remove the need for this. DEFINE_string(table, "t", "What region to send a get"); DEFINE_string(row, "test", "What row to get"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_uint64(columns, 10000, "How many columns to write"); +DEFINE_int32(threads, 12, "How many cpu threads"); + +std::unique_ptr MakeRequest(uint64_t col, std::string region_name) { + auto req = Request::mutate(); + auto msg = std::static_pointer_cast(req->req_msg()); + auto region = msg->mutable_region(); + region->set_value(region_name); + region->set_type(RegionSpecifier_RegionSpecifierType:: + RegionSpecifier_RegionSpecifierType_REGION_NAME); + auto mutation = msg->mutable_mutation(); + mutation->set_row(FLAGS_row); + mutation->set_mutate_type( + MutationProto_MutationType::MutationProto_MutationType_PUT); + auto column = mutation->add_column_value(); + column->set_family("d"); + auto qual = column->add_qualifier_value(); + qual->set_qualifier(folly::to(col)); + qual->set_value("test"); + + return std::move(req); +} int main(int argc, char *argv[]) { google::SetUsageMessage( @@ -56,13 +80,32 @@ int main(int argc, char *argv[]) { google::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); - // Create a connection factory - ConnectionPool cp; - auto cpu_ex = wangle::getCPUExecutor(); - LocationCache cache{FLAGS_zookeeper, cpu_ex}; - auto result = - cache.LocateFromMeta(folly::to(FLAGS_table), FLAGS_row) - .get(milliseconds(5000)); + // Set up thread pools. + auto cpu_pool = + std::make_shared(FLAGS_threads); + wangle::setCPUExecutor(cpu_pool); + + // Create the cache. + LocationCache cache{FLAGS_zookeeper, cpu_pool}; + + auto row = FLAGS_row; + auto tn = folly::to(FLAGS_table); + auto region_location = cache.LocateFromMeta(tn, row).get(milliseconds(5000)); + auto connection = cache.cp_.get(region_location->server_name()); + + auto num_puts = FLAGS_columns; + + std::vector> results; + for (uint64_t col = 0; col < num_puts; col++) { + results.push_back(wangle::async([col=col, region_location=region_location]() { + return MakeRequest(col, region_location->region_name()); + }).then([connection=connection](std::unique_ptr req) { + return (*connection)(std::move(req)); + })); + } + auto allf = folly::collectAll(results).get(); + + LOG(ERROR) << "Done with everything"; return 0; } diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc index ce8dedf..5cb8482 100644 --- a/hbase-native-client/serde/region-info-deserializer-test.cc +++ b/hbase-native-client/serde/region-info-deserializer-test.cc @@ -44,7 +44,6 @@ TEST(TestRegionInfoDesializer, TestDeserialize) { ri_out.set_start_key(start_row); ri_out.set_end_key(stop_row); - string header{"PBUF"}; string ser = header + ri_out.SerializeAsString(); diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h index 6af351c..e2ecfc9 100644 --- a/hbase-native-client/serde/region-info.h +++ b/hbase-native-client/serde/region-info.h @@ -21,16 +21,16 @@ #include "if/HBase.pb.h" -#include #include +#include namespace hbase { namespace pb { -template void parseTo(String in, RegionInfo& out) { +template void parseTo(String in, RegionInfo &out) { // TODO(eclark): there has to be something better. std::string s = folly::to(in); - if (!boost::starts_with(s, "PBUF") ) { + if (!boost::starts_with(s, "PBUF")) { throw std::runtime_error("Region Info field doesn't contain preamble"); } if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) { -- 2.8.0-rc2