From b2f70ee9a0bc2e7f204c51d6ec5c364391d7378c Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 28 Apr 2016 19:53:42 -0700 Subject: [PATCH] HBASE-15731 Add on a connection pool Summary: Add on a connection pool protected by read write mutex. Add on a service filter that will remove a connection from a connection pool when closed Test Plan: Need to add on tests. Differential Revision: https://reviews.facebook.net/D57411 --- hbase-native-client/connection/BUCK | 2 + .../connection/connection-factory.cc | 2 +- .../connection/connection-factory.h | 4 +- hbase-native-client/connection/connection-pool.cc | 87 +++++++++++++++++ hbase-native-client/connection/connection-pool.h | 57 +++++++++++ hbase-native-client/connection/service.h | 4 +- hbase-native-client/core/BUCK | 4 - hbase-native-client/core/client.h | 3 - hbase-native-client/core/get-request.cc | 19 ---- hbase-native-client/core/get-request.h | 35 ------- hbase-native-client/core/get-result.cc | 19 ---- hbase-native-client/core/get-result.h | 32 ------- hbase-native-client/core/location-cache.cc | 1 - hbase-native-client/core/simple-client.cc | 105 ++++++++++++--------- 14 files changed, 211 insertions(+), 163 deletions(-) create mode 100644 hbase-native-client/connection/connection-pool.cc create mode 100644 hbase-native-client/connection/connection-pool.h delete mode 100644 hbase-native-client/core/get-request.cc delete mode 100644 hbase-native-client/core/get-request.h delete mode 100644 hbase-native-client/core/get-result.cc delete mode 100644 hbase-native-client/core/get-result.h diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index d393885..cbda3a9 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -22,6 +22,7 @@ cxx_library(name="connection", "client-dispatcher.h", "client-handler.h", "connection-factory.h", + "connection-pool.h", "pipeline.h", "request.h", "response.h", @@ -31,6 +32,7 @@ cxx_library(name="connection", "client-dispatcher.cc", "client-handler.cc", "connection-factory.cc", + "connection-pool.cc", "pipeline.cc", "request.cc", ], diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 7073f9d..c92651e 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -45,7 +45,7 @@ ConnectionFactory::ConnectionFactory() { bootstrap_.pipelineFactory(std::make_shared()); } -std::shared_ptr, Response>> +std::shared_ptr ConnectionFactory::make_connection(std::string host, int port) { // Connect to a given server // Then when connected create a ClientDispactcher. diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index 8d1d2f0..621a238 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -31,8 +31,8 @@ namespace hbase { class ConnectionFactory { public: ConnectionFactory(); - std::shared_ptr, Response>> - make_connection(std::string host, int port); + + std::shared_ptr make_connection(std::string host, int port); private: wangle::ClientBootstrap bootstrap_; diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc new file mode 100644 index 0000000..c9e2307 --- /dev/null +++ b/hbase-native-client/connection/connection-pool.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/connection-pool.h" + +#include + +using std::mutex; +using std::unique_ptr; +using std::shared_ptr; +using hbase::pb::ServerName; +using wangle::ServiceFilter; +using folly::SharedMutexWritePriority; + +namespace hbase { + +class RemoveServiceFilter + : public ServiceFilter, Response> { + +public: + RemoveServiceFilter(std::shared_ptr service, ServerName sn, + ConnectionPool *cp) + : ServiceFilter, Response>(service), sn_(sn), + cp_(cp) {} + + folly::Future close() override { + if (!released.exchange(true)) { + return this->service_->close().then( + [this]() { this->cp_->close(this->sn_); }); + } else { + return folly::makeFuture(); + } + } + + virtual bool isAvailable() override { return service_->isAvailable(); } + + folly::Future operator()(unique_ptr req) override { + return (*this->service_)(std::move(req)); + } + +private: + std::atomic released{false}; + hbase::pb::ServerName sn_; + ConnectionPool *cp_; +}; + +std::shared_ptr ConnectionPool::get(const ServerName &sn) { + SharedMutexWritePriority::UpgradeHolder holder(map_mutex_); + + auto found = connections_.find(sn); + if (found == connections_.end() || found->second == nullptr) { + SharedMutexWritePriority::WriteHolder holder(std::move(holder)); + + auto new_con = cf_.make_connection(sn.host_name(), sn.port()); + auto wrapped = std::make_shared(new_con, sn, this); + connections_[sn] = wrapped; + return new_con; + } + return found->second; +} +void ConnectionPool::close(ServerName sn) { + SharedMutexWritePriority::WriteHolder holder(map_mutex_); + + auto found = connections_.find(sn); + if (found == connections_.end() || found->second == nullptr) { + return; + } + auto service = found->second; + connections_.erase(found); +} +} diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h new file mode 100644 index 0000000..2fc0759 --- /dev/null +++ b/hbase-native-client/connection/connection-pool.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include + +#include "connection/connection-factory.h" +#include "connection/service.h" +#include "if/HBase.pb.h" + +namespace hbase { +struct MyServerNameEquals { + bool operator()(const hbase::pb::ServerName &lhs, + const hbase::pb::ServerName &rhs) const { + return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port(); + } +}; +struct MyServerNameHash { + std::size_t operator()(hbase::pb::ServerName const &s) const { + std::size_t h1 = std::hash()(s.host_name()); + std::size_t h2 = std::hash()(s.port()); + return h1 ^ (h2 << 1); // or use boost::hash_combine + } +}; + +class ConnectionPool { +public: + std::shared_ptr get(const hbase::pb::ServerName &sn); + void close(hbase::pb::ServerName sn); + +private: + ConnectionFactory cf_; + std::unordered_map, + MyServerNameHash, MyServerNameEquals> + connections_; + folly::SharedMutexWritePriority map_mutex_; +}; + +} // namespace hbase diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h index feb14ec..79f087d 100644 --- a/hbase-native-client/connection/service.h +++ b/hbase-native-client/connection/service.h @@ -18,9 +18,11 @@ */ #pragma once +#include + #include "connection/request.h" #include "connection/response.h" namespace hbase { -using HBaseService = wangle::Service; +using HBaseService = wangle::Service, Response>; } // namespace hbase diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 9db6fda..e555ba4 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -21,8 +21,6 @@ cxx_library( exported_headers=[ "client.h", "connection.h", - "get-request.h", - "get-result.h", "hbase_macros.h", "location-cache.h", "table-name.h", @@ -32,8 +30,6 @@ cxx_library( ], srcs=[ "client.cc", - "get-request.cc", - "get-result.cc", "location-cache.cc", "meta-utils.cc", "table-name.cc", diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index b583285..4bed751 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -24,8 +24,6 @@ #include -#include "core/get-request.h" -#include "core/get-result.h" #include "core/location-cache.h" #include "if/Cell.pb.h" @@ -33,7 +31,6 @@ namespace hbase { class Client { public: explicit Client(std::string quorum_spec); - folly::Future get(const GetRequest &get_request); private: LocationCache location_cache_; diff --git a/hbase-native-client/core/get-request.cc b/hbase-native-client/core/get-request.cc deleted file mode 100644 index e927ccc..0000000 --- a/hbase-native-client/core/get-request.cc +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "core/get-request.h" diff --git a/hbase-native-client/core/get-request.h b/hbase-native-client/core/get-request.h deleted file mode 100644 index bb755c5..0000000 --- a/hbase-native-client/core/get-request.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include - -#include "core/table-name.h" - -namespace hbase { - -class GetRequest { -public: - GetRequest(TableName table_name, std::string key); - -private: - TableName table_name_; - std::string key_; -}; -} // namespace hbase diff --git a/hbase-native-client/core/get-result.cc b/hbase-native-client/core/get-result.cc deleted file mode 100644 index 7eea483..0000000 --- a/hbase-native-client/core/get-result.cc +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "core/get-result.h" diff --git a/hbase-native-client/core/get-result.h b/hbase-native-client/core/get-result.h deleted file mode 100644 index a49ad98..0000000 --- a/hbase-native-client/core/get-result.h +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include - -namespace hbase { - -class GetResult { -public: - explicit GetResult(std::string key); - -private: - std::string key_; -}; -} // namespace hbase diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 5925f4a..c81deba 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -77,7 +77,6 @@ ServerName LocationCache::ReadMetaLocation() { int zk_result = zoo_get(this->zk_, META_ZNODE_NAME, 0, reinterpret_cast(buf->writableData()), &len, nullptr); - LOG(ERROR) << "len = " << len; if (zk_result != ZOK || len < 9) { LOG(ERROR) << "Error getting meta location."; throw runtime_error("Error getting meta location"); diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 2cb6200..11dcd68 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -25,7 +25,7 @@ #include #include -#include "connection/connection-factory.h" +#include "connection/connection-pool.h" #include "core/client.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" @@ -33,13 +33,20 @@ using namespace folly; using namespace std; using namespace std::chrono; -using namespace hbase; -using namespace hbase::pb; -using namespace google::protobuf; +using hbase::Response; +using hbase::Request; +using hbase::HBaseService; +using hbase::LocationCache; +using hbase::ConnectionPool; +using hbase::pb::ServerName; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::pb::GetRequest; +using hbase::pb::GetResponse; // TODO(eclark): remove the need for this. DEFINE_string(region, "1588230740", "What region to send a get to"); DEFINE_string(row, "test", "What row to get"); +DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); int main(int argc, char *argv[]) { google::SetUsageMessage( @@ -48,46 +55,52 @@ int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); // Create a connection factory - ConnectionFactory cf; - - LocationCache cache{"localhost:2181", wangle::getCPUExecutor()}; - - auto result = cache.LocateMeta().get(); - - // Create a connection to the local host - auto conn = cf.make_connection(result.host_name(), result.port()); - - // Send the request - auto r = Request::get(); - - // This is a get request so make that - auto req_msg = static_pointer_cast(r->req_msg()); - - // Set what region - req_msg->mutable_region()->set_value(FLAGS_region); - // It's always this. - req_msg->mutable_region()->set_type( - RegionSpecifier_RegionSpecifierType:: - RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); - - // What row. - req_msg->mutable_get()->set_row(FLAGS_row); - - // Send it. - auto resp = (*conn)(std::move(r)).get(milliseconds(5000)); - - auto get_resp = std::static_pointer_cast(resp.response()); - cout << "GetResponse has_result = " << get_resp->has_result() << '\n'; - if (get_resp->has_result()) { - auto &r = get_resp->result(); - cout << "Result cell_size = " << r.cell_size() << endl; - for (auto &cell : r.cell()) { - cout << "\trow = " << cell.row() << " family = " << cell.family() - << " qualifier = " << cell.qualifier() - << " timestamp = " << cell.timestamp() << " value = " << cell.value() - << endl; - } - } - - return 0; + ConnectionPool cp; + auto cpu_ex = wangle::getCPUExecutor(); + LocationCache cache{FLAGS_zookeeper, cpu_ex}; + auto result = + cache.LocateMeta() + .then([&cp = cp](ServerName sn) { return cp.get(sn); }) + .then([](shared_ptr con) { + // Send the request + auto r = Request::get(); + // This is a get request so make that + auto req_msg = static_pointer_cast(r->req_msg()); + // Set what region + req_msg->mutable_region()->set_value(FLAGS_region); + // It's always this. + req_msg->mutable_region()->set_type( + RegionSpecifier_RegionSpecifierType:: + RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); + + // What row. + req_msg->mutable_get()->set_row(FLAGS_row); + + return (*con)(std::move(r)); + }) + .then([](Response resp) { + return static_pointer_cast(resp.response()); + }) + .via(cpu_ex.get()) + .then([](shared_ptr get_resp) { + cout << "GetResponse has_result = " << get_resp->has_result() + << '\n'; + if (get_resp->has_result()) { + auto &r = get_resp->result(); + cout << "Result cell_size = " << r.cell_size() << endl; + for (auto &cell : r.cell()) { + cout << "\trow = " << cell.row() + << " family = " << cell.family() + << " qualifier = " << cell.qualifier() + << " timestamp = " << cell.timestamp() + << " value = " << cell.value() << endl; + } + return 0; + } + + return 1; + }) + .get(milliseconds(5000)); + + return result; } -- 2.8.0-rc2