From eab6c1498faf64bd5eb2d5fed4eff0e83d0f0fa5 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Fri, 18 Nov 2016 11:00:30 -0800 Subject: [PATCH] HBASE-17051. libhbase++: implement RPC connection management --- hbase-native-client/Makefile | 2 +- hbase-native-client/ipc/connection-id.h | 74 +++++++++++++++++++++ hbase-native-client/ipc/rpc-client.cc | 110 +++++++++++++++++++++++++++++++ hbase-native-client/ipc/rpc-client.h | 70 ++++++++++++++++++++ hbase-native-client/ipc/rpc-connection.h | 53 +++++++++++++++ hbase-native-client/security/user.h | 33 ++++++++++ 6 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 hbase-native-client/ipc/connection-id.h create mode 100644 hbase-native-client/ipc/rpc-client.cc create mode 100644 hbase-native-client/ipc/rpc-client.h create mode 100644 hbase-native-client/ipc/rpc-connection.h create mode 100644 hbase-native-client/security/user.h diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index 64cef06..b2ec7fe 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils +MODULES = connection core serde test-util utils ipc security SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git a/hbase-native-client/ipc/connection-id.h b/hbase-native-client/ipc/connection-id.h new file mode 100644 index 0000000..7f56565 --- /dev/null +++ b/hbase-native-client/ipc/connection-id.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include "security/user.h" + +using hbase::pb::ServerName; +using hbase::security::User; + +namespace hbase { +namespace ipc { + +class ConnectionId { +public: + explicit ConnectionId( + std::shared_ptr user, + const ServerName& server_name) + : user_(user), server_name_(server_name) {} + + virtual ~ConnectionId() = default; + + std::shared_ptr user() const {return user_;} + ServerName server_name() const {return server_name_;} + +private: + std::shared_ptr user_; + ServerName server_name_; +}; + + +/* Equals function for ConnectionId */ +struct ConnectionIdEquals { + /** equals */ + bool operator() ( + const std::shared_ptr &lhs, + const std::shared_ptr &rhs) const { + return lhs->user()->user_name() == rhs->user()->user_name() + && lhs->server_name().host_name() == rhs->server_name().host_name() + && lhs->server_name().port() == rhs->server_name().port(); + } +}; + + +/** Hash for ConnectionId. */ +struct ConnectionIdHash { + /** hash */ + std::size_t operator()(const std::shared_ptr &ci) const { + std::size_t h = 0; + boost::hash_combine(h, ci->user()->user_name()); + boost::hash_combine(h, ci->server_name().host_name()); + boost::hash_combine(h, ci->server_name().port()); + return h; + } +}; +} +} // namespace hbase::ipc diff --git a/hbase-native-client/ipc/rpc-client.cc b/hbase-native-client/ipc/rpc-client.cc new file mode 100644 index 0000000..68d47c7 --- /dev/null +++ b/hbase-native-client/ipc/rpc-client.cc @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "rpc-client.h" +#include + +using hbase::ipc::RpcClient; +using folly::SharedMutexWritePriority; + +RpcClient::RpcClient( + std::shared_ptr user, + const ServerName& server_name) : + user_(user), + server_name_(server_name), + map_mutex_(), + connections_() { + + auto io_executor = + std::make_shared( + sysconf(_SC_NPROCESSORS_ONLN)); + + cf_ = std::make_shared(io_executor); +} + +void RpcClient::close() { +} + +std::shared_ptr RpcClient::sync_call(std::unique_ptr req) { + return std::make_shared( + get_connection()->sendRequest(std::move(req)).get()); +} + +folly::Future RpcClient::async_call(std::unique_ptr req) { + return get_connection()->sendRequest(std::move(req)); +} + +std::shared_ptr RpcClient::get_connection() { + + auto remote_id = std::make_shared(user_, server_name_); + + /* get from cache */ + auto found_ptr = get_cached_connection(remote_id); + + /* create new connection if no */ + if (found_ptr == nullptr) { + found_ptr = get_new_connection(remote_id); + } + + return found_ptr; +} + +std::shared_ptr RpcClient::get_cached_connection( + std::shared_ptr remote_id) { + + SharedMutexWritePriority::ReadHolder holder(map_mutex_); + auto found = connections_.find(remote_id); + if (found == connections_.end()) { + return nullptr; + } + return found->second; +} + +std::shared_ptr RpcClient::get_new_connection( + std::shared_ptr remote_id) { + /* allow readers going on */ + SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; + + /* double check */ + auto found = connections_.find(remote_id); + if (found != connections_.end() && found->second != nullptr) { + return found->second; + } else { + /* no connection */ + SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; + + /* erase stale connection, if any */ + connections_.erase(remote_id); + + /* create new connection */ + auto clientBootstrap = cf_->MakeBootstrap(); + auto dispatcher = cf_->Connect( + clientBootstrap, + server_name_.host_name(), + server_name_.port()); + + auto conneciton = std::make_shared( + remote_id, + dispatcher); + + connections_.insert(std::make_pair(remote_id, conneciton)); + + return conneciton; + } +} diff --git a/hbase-native-client/ipc/rpc-client.h b/hbase-native-client/ipc/rpc-client.h new file mode 100644 index 0000000..d654363 --- /dev/null +++ b/hbase-native-client/ipc/rpc-client.h @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + + +#include "rpc-connection.h" +#include "connection-id.h" +#include +#include +#include +#include "connection/connection-factory.h" +#include "security/user.h" +#include "connection/request.h" +#include "connection/response.h" + +using hbase::security::User; +using hbase::ConnectionFactory; +using hbase::pb::ServerName; +using hbase::Request; +using hbase::Response; +using hbase::ipc::RpcConnection; +using hbase::ipc::ConnectionId; + +namespace hbase { +namespace ipc { +class RpcClient { +public: + RpcClient( + std::shared_ptr user, + const ServerName& server_name); + + virtual ~RpcClient() {close();} + + virtual void close(); + + virtual std::shared_ptr sync_call(std::unique_ptr req); + + virtual folly::Future async_call(std::unique_ptr req); + +private: + std::shared_ptr get_connection(); + std::shared_ptr get_cached_connection(std::shared_ptr remote_id); + std::shared_ptr get_new_connection(std::shared_ptr remote_id); + +private: + folly::SharedMutexWritePriority map_mutex_; + std::shared_ptr user_; + ServerName server_name_; + std::shared_ptr cf_; + std::unordered_map, + std::shared_ptr, ConnectionIdHash, ConnectionIdEquals> connections_; +}; +} +} // namespace hbase::ipc diff --git a/hbase-native-client/ipc/rpc-connection.h b/hbase-native-client/ipc/rpc-connection.h new file mode 100644 index 0000000..fd448a1 --- /dev/null +++ b/hbase-native-client/ipc/rpc-connection.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "connection-id.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/service.h" + +using hbase::HBaseService; + +namespace hbase { +namespace ipc { + +class RpcConnection { +public: + RpcConnection( + std::shared_ptr connection_id, + std::shared_ptr hbase_service) + : connection_id_(connection_id), hbase_service_(hbase_service) {} + + virtual ~RpcConnection() {close();} + + virtual std::shared_ptr remote_id() {return connection_id_;} + + virtual folly::Future sendRequest(std::unique_ptr req) { + return (*hbase_service_)(std::move(req)); + } + + virtual void close() {hbase_service_->close();} + +private: + std::shared_ptr connection_id_; + std::shared_ptr hbase_service_; +}; +} +} // namespace hbase::ipc diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h new file mode 100644 index 0000000..0a096fd --- /dev/null +++ b/hbase-native-client/security/user.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +namespace hbase { +namespace security { +class User { +public: + explicit User(const std::string& user_name) : user_name_(user_name) {} + virtual ~User() = default; + + std::string user_name() {return user_name_;} +private: + std::string user_name_; +}; +} +} -- 2.7.4 (Apple Git-66)