From c09f396129d439d4603a581eb807b1093fe505b2 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Fri, 18 Nov 2016 11:00:30 -0800 Subject: [PATCH] HBASE-17051. libhbase++: implement RPC client and connection management --- hbase-native-client/Makefile | 2 +- .../connection/connection-pool-test.cc | 33 ++++++--- hbase-native-client/connection/connection-pool.cc | 49 ++++++++----- hbase-native-client/connection/connection-pool.h | 52 +++++-------- hbase-native-client/core/location-cache.cc | 4 +- hbase-native-client/ipc/connection-id.h | 85 ++++++++++++++++++++++ hbase-native-client/ipc/rpc-client.cc | 49 +++++++++++++ hbase-native-client/ipc/rpc-client.h | 56 ++++++++++++++ hbase-native-client/ipc/rpc-connection.h | 57 +++++++++++++++ hbase-native-client/security/user.h | 33 +++++++++ 10 files changed, 355 insertions(+), 65 deletions(-) create mode 100644 hbase-native-client/ipc/connection-id.h create mode 100644 hbase-native-client/ipc/rpc-client.cc create mode 100644 hbase-native-client/ipc/rpc-client.h create mode 100644 hbase-native-client/ipc/rpc-connection.h create mode 100644 hbase-native-client/security/user.h diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index 64cef06..b2ec7fe 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils +MODULES = connection core serde test-util utils ipc security SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index bd2d585..fcbcb9a 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -25,12 +25,14 @@ #include "connection/connection-factory.h" #include "if/HBase.pb.h" #include "serde/server-name.h" +#include "ipc/connection-id.h" using namespace hbase; using hbase::pb::ServerName; using ::testing::Return; using ::testing::_; +using hbase::ipc::ConnectionId; class MockConnectionFactory : public ConnectionFactory { public: @@ -79,9 +81,10 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { sn.set_host_name(hostname); sn.set_port(port); - auto result = cp.Get(sn); + auto remoted_id = std::make_shared(nullptr, sn); + auto result = cp.get_connection(remoted_id); ASSERT_TRUE(result != nullptr); - result = cp.Get(sn); + result = cp.get_connection(remoted_id); } TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { @@ -102,13 +105,23 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { ConnectionPool cp{mock_cf}; { - auto result_one = cp.Get(folly::to( - hostname_one + ":" + folly::to(port))); - auto result_two = cp.Get(folly::to( - hostname_two + ":" + folly::to(port))); + auto remoted_id = std::make_shared( + nullptr, + folly::to( + hostname_one + ":" + folly::to(port))); + auto result_one = cp.get_connection(remoted_id); + auto remoted_id2 = std::make_shared( + nullptr, + folly::to( + hostname_two + ":" + folly::to(port))); + auto result_two = cp.get_connection(remoted_id2); } - auto result_one = cp.Get( - folly::to(hostname_one + ":" + folly::to(port))); - auto result_two = cp.Get( - folly::to(hostname_two + ":" + folly::to(port))); + auto remoted_id = std::make_shared( + nullptr, + folly::to(hostname_one + ":" + folly::to(port))); + auto result_one = cp.get_connection(remoted_id); + auto remoted_id2 = std::make_shared( + nullptr, + folly::to(hostname_two + ":" + folly::to(port))); + auto result_two = cp.get_connection(remoted_id2); } diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index aa3d094..3943b8f 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -25,7 +25,6 @@ using std::mutex; using std::unique_ptr; using std::shared_ptr; -using hbase::pb::ServerName; using hbase::ConnectionPool; using hbase::HBaseService; using folly::SharedMutexWritePriority; @@ -48,27 +47,30 @@ ConnectionPool::~ConnectionPool() { clients_.clear(); } -std::shared_ptr ConnectionPool::Get(const ServerName &sn) { +std::shared_ptr ConnectionPool::get_connection( + std::shared_ptr remote_id) { // Try and get th cached connection. - auto found_ptr = GetCached(sn); + auto found_ptr = get_cached_connection(remote_id); // If there's no connection then create it. if (found_ptr == nullptr) { - found_ptr = GetNew(sn); + found_ptr = get_new_connection(remote_id); } return found_ptr; } -std::shared_ptr ConnectionPool::GetCached(const ServerName &sn) { +std::shared_ptr ConnectionPool::get_cached_connection( + std::shared_ptr remote_id) { SharedMutexWritePriority::ReadHolder holder(map_mutex_); - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found == connections_.end()) { return nullptr; } return found->second; } -std::shared_ptr ConnectionPool::GetNew(const ServerName &sn) { +std::shared_ptr ConnectionPool::get_new_connection( + std::shared_ptr remote_id) { // Grab the upgrade lock. While we are double checking other readers can // continue on SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; @@ -76,7 +78,7 @@ std::shared_ptr ConnectionPool::GetNew(const ServerName &sn) { // Now check if someone else created the connection before we got the lock // This is safe since we hold the upgrade lock. // upgrade lock is more power than the reader lock. - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found != connections_.end() && found->second != nullptr) { return found->second; } else { @@ -84,24 +86,33 @@ std::shared_ptr ConnectionPool::GetNew(const ServerName &sn) { SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; // Make double sure there are not stale connections hanging around. - connections_.erase(sn); - - // Nope we are the ones who should create the new connection. - auto client = cf_->MakeBootstrap(); - auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port()); - clients_.insert(std::make_pair(sn, client)); - connections_.insert(std::make_pair(sn, dispatcher)); - return dispatcher; + connections_.erase(remote_id); + + /* create new connection */ + auto clientBootstrap = cf_->MakeBootstrap(); + auto dispatcher = cf_->Connect( + clientBootstrap, + remote_id->server_name().host_name(), + remote_id->server_name().port()); + + auto conneciton = std::make_shared( + remote_id, + dispatcher); + + connections_.insert(std::make_pair(remote_id, conneciton)); + clients_.insert(std::make_pair(remote_id, clientBootstrap)); + + return conneciton; } } -void ConnectionPool::Close(const ServerName &sn) { +void ConnectionPool::close(std::shared_ptr remote_id) { SharedMutexWritePriority::WriteHolder holder{map_mutex_}; - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found == connections_.end() || found->second == nullptr) { return; } - auto service = found->second; + found->second->close(); connections_.erase(found); } diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index b8c950b..f08b979 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -27,36 +27,21 @@ #include "connection/service.h" #include "if/HBase.pb.h" -namespace hbase { - -/** Equals function for server name that ignores start time */ -struct ServerNameEquals { +#include "ipc/connection-id.h" +#include "ipc/rpc-connection.h" - /** equals */ - bool operator()(const hbase::pb::ServerName &lhs, - const hbase::pb::ServerName &rhs) const { - return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port(); - } -}; +using hbase::ipc::ConnectionId; +using hbase::ipc::ConnectionIdEquals; +using hbase::ipc::ConnectionIdHash; +using hbase::ipc::RpcConnection; -/** Hash for ServerName that ignores the start time. */ -struct ServerNameHash { - /** hash */ - std::size_t operator()(hbase::pb::ServerName const &s) const { - std::size_t h = 0; - boost::hash_combine(h, s.host_name()); - boost::hash_combine(h, s.port()); - return h; - } -}; +namespace hbase { /** * @brief Connection pooling for HBase rpc connection. * * This is a thread safe connection pool. It allows getting - * a shared connection to HBase by server name. This is - * useful for keeping a single connection no matter how many regions a - * regionserver has on it. + * a shared rpc connection to HBase servers by connection id. */ class ConnectionPool { public: @@ -80,24 +65,25 @@ public: * Get a connection to the server name. Start time is ignored. * This can be a blocking operation for a short time. */ - std::shared_ptr Get(const hbase::pb::ServerName &sn); + std::shared_ptr get_connection(std::shared_ptr remote_id); /** * Close/remove a connection. */ - void Close(const hbase::pb::ServerName &sn); + void close(std::shared_ptr remote_id); private: - std::shared_ptr GetCached(const hbase::pb::ServerName &sn); - std::shared_ptr GetNew(const hbase::pb::ServerName &sn); - std::unordered_map, - ServerNameHash, ServerNameEquals> - connections_; + std::shared_ptr get_cached_connection(std::shared_ptr remote_id); + std::shared_ptr get_new_connection(std::shared_ptr remote_id); + std::unordered_map< + std::shared_ptr, + std::shared_ptr, + ConnectionIdHash, + ConnectionIdEquals> connections_; std::unordered_map< - hbase::pb::ServerName, + std::shared_ptr, std::shared_ptr>, - ServerNameHash, ServerNameEquals> - clients_; + ConnectionIdHash, ConnectionIdEquals> clients_; folly::SharedMutexWritePriority map_mutex_; std::shared_ptr cf_; }; diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index efd2210..4b90221 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -115,7 +115,7 @@ Future> LocationCache::LocateFromMeta(const TableName &tn, const string &row) { return this->LocateMeta() .via(cpu_executor_.get()) - .then([this](ServerName sn) { return this->cp_.Get(sn); }) + .then([this](ServerName sn) { return this->cp_.get_connection(sn); }) .then([tn, row, this](std::shared_ptr service) { return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) @@ -134,7 +134,7 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { }) .then([this](std::shared_ptr rl) { // Now fill out the connection. - rl->set_service(cp_.Get(rl->server_name())); + rl->set_service(cp_.get_connection(rl->server_name())); return rl; }); } diff --git a/hbase-native-client/ipc/connection-id.h b/hbase-native-client/ipc/connection-id.h new file mode 100644 index 0000000..9288c68 --- /dev/null +++ b/hbase-native-client/ipc/connection-id.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include "security/user.h" + +using hbase::pb::ServerName; +using hbase::security::User; + +namespace hbase { +namespace ipc { + +class ConnectionId { +public: + explicit ConnectionId( + std::shared_ptr user, + const ServerName& server_name) + : user_(user), server_name_(server_name) {} + + virtual ~ConnectionId() = default; + + std::shared_ptr user() const {return user_;} + ServerName server_name() const {return server_name_;} + +private: + std::shared_ptr user_; + ServerName server_name_; +}; + + +/* Equals function for ConnectionId */ +struct ConnectionIdEquals { + /** equals */ + bool operator() ( + const std::shared_ptr &lhs, + const std::shared_ptr &rhs) const { + return + userEquals(lhs->user(), rhs->user()) + && lhs->server_name().host_name() == rhs->server_name().host_name() + && lhs->server_name().port() == rhs->server_name().port(); + } + +private: + bool userEquals( + const std::shared_ptr &lhs, + const std::shared_ptr &rhs) const { + return + lhs == nullptr ? + rhs == nullptr : + (rhs == nullptr ? false : lhs->user_name() == rhs->user_name()); + } +}; + + +/** Hash for ConnectionId. */ +struct ConnectionIdHash { + /** hash */ + std::size_t operator()(const std::shared_ptr &ci) const { + std::size_t h = 0; + boost::hash_combine(h, ci->user() == nullptr ? 0 : ci->user()->user_name()); + boost::hash_combine(h, ci->server_name().host_name()); + boost::hash_combine(h, ci->server_name().port()); + return h; + } +}; +} +} // namespace hbase::ipc diff --git a/hbase-native-client/ipc/rpc-client.cc b/hbase-native-client/ipc/rpc-client.cc new file mode 100644 index 0000000..63b5902 --- /dev/null +++ b/hbase-native-client/ipc/rpc-client.cc @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "rpc-client.h" +#include +#include + +using hbase::ipc::RpcClient; + +RpcClient::RpcClient( + std::shared_ptr user, + const ServerName& server_name) : + remote_id_(std::make_shared(user, server_name)) { + + auto io_executor = + std::make_shared( + sysconf(_SC_NPROCESSORS_ONLN)); + + cp_ = std::make_shared(io_executor); +} + +void RpcClient::close() { +} + +std::shared_ptr RpcClient::sync_call(std::unique_ptr req) { + return std::make_shared( + cp_->get_connection(remote_id_)->sendRequest(std::move(req)).get()); +} + +folly::Future RpcClient::async_call(std::unique_ptr req) { + return cp_->get_connection(remote_id_)->sendRequest(std::move(req)); +} + diff --git a/hbase-native-client/ipc/rpc-client.h b/hbase-native-client/ipc/rpc-client.h new file mode 100644 index 0000000..649b2ae --- /dev/null +++ b/hbase-native-client/ipc/rpc-client.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + + +#include "security/user.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/connection-pool.h" +#include "connection-id.h" + +using hbase::security::User; +using hbase::pb::ServerName; +using hbase::Request; +using hbase::Response; +using hbase::ipc::ConnectionId; +using hbase::ConnectionPool; + +namespace hbase { +namespace ipc { +class RpcClient { +public: + RpcClient( + std::shared_ptr user, + const ServerName& server_name); + + virtual ~RpcClient() {close();} + + virtual std::shared_ptr sync_call(std::unique_ptr req); + + virtual folly::Future async_call(std::unique_ptr req); + + virtual void close(); + +private: + std::shared_ptr cp_; + std::shared_ptr remote_id_; +}; +} +} // namespace hbase::ipc diff --git a/hbase-native-client/ipc/rpc-connection.h b/hbase-native-client/ipc/rpc-connection.h new file mode 100644 index 0000000..af6edba --- /dev/null +++ b/hbase-native-client/ipc/rpc-connection.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "connection-id.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/service.h" + +using hbase::HBaseService; + +namespace hbase { +namespace ipc { + +class RpcConnection { +public: + RpcConnection( + std::shared_ptr connection_id, + std::shared_ptr hbase_service) + : connection_id_(connection_id), hbase_service_(hbase_service) {} + + virtual ~RpcConnection() {close();}; + + virtual std::shared_ptr remote_id() const {return connection_id_;} + + virtual std::shared_ptr get_service() const {return hbase_service_;} + + virtual folly::Future sendRequest(std::unique_ptr req) { + return (*hbase_service_)(std::move(req)); + } + + virtual void close() { + hbase_service_->close(); + } + +private: + std::shared_ptr connection_id_; + std::shared_ptr hbase_service_; +}; +} +} // namespace hbase::ipc diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h new file mode 100644 index 0000000..0a096fd --- /dev/null +++ b/hbase-native-client/security/user.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +namespace hbase { +namespace security { +class User { +public: + explicit User(const std::string& user_name) : user_name_(user_name) {} + virtual ~User() = default; + + std::string user_name() {return user_name_;} +private: + std::string user_name_; +}; +} +} -- 2.7.4 (Apple Git-66)