diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index e022f9e..ee14c9d 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -117,3 +117,6 @@ void ConnectionPool::Close(std::shared_ptr remote_id) { found->second->Close(); connections_.erase(found); } + +void ConnectionPool::Close() { +} diff --git hbase-native-client/connection/connection-pool.h hbase-native-client/connection/connection-pool.h index 96d89ac..5101c68 100644 --- hbase-native-client/connection/connection-pool.h +++ hbase-native-client/connection/connection-pool.h @@ -75,6 +75,11 @@ class ConnectionPool { */ void Close(std::shared_ptr remote_id); + /** + * Close the Connection Pool + */ + void Close(); + private: std::shared_ptr GetCachedConnection( std::shared_ptr remote_id); diff --git hbase-native-client/connection/rpc-client.cc hbase-native-client/connection/rpc-client.cc index 66ec231..3f0cfaf 100644 --- hbase-native-client/connection/rpc-client.cc +++ hbase-native-client/connection/rpc-client.cc @@ -43,13 +43,15 @@ class RpcChannelImplementation : public AbstractRpcChannel { } // namespace hbase RpcClient::RpcClient() { - auto io_executor = std::make_shared( + io_executor_ = std::make_shared( sysconf(_SC_NPROCESSORS_ONLN)); - cp_ = std::make_shared(io_executor); + cp_ = std::make_shared(io_executor_); } -void RpcClient::Close() {} +void RpcClient::Close() { + io_executor_->stop(); +} std::shared_ptr RpcClient::SyncCall(const std::string& host, uint16_t port, @@ -114,6 +116,8 @@ void RpcClient::CallMethod(const MethodDescriptor* method, std::unique_ptr req = std::make_unique(shared_req, shared_resp, method->name()); - AsyncCall(host, port, std::move(req), ticket) - .then([done, this](Response resp) { done->Run(); }); + AsyncCall(host, port, std::move(req), ticket, method->service()->name()) + .then([done, this](Response resp) { + done->Run(); + }); } diff --git hbase-native-client/connection/rpc-client.h hbase-native-client/connection/rpc-client.h index c24db9d..dbf857d 100644 --- hbase-native-client/connection/rpc-client.h +++ hbase-native-client/connection/rpc-client.h @@ -91,6 +91,7 @@ class RpcClient : public std::enable_shared_from_this { private: std::shared_ptr cp_; + std::shared_ptr io_executor_; }; class AbstractRpcChannel : public RpcChannel { diff --git hbase-native-client/serde/rpc.cc hbase-native-client/serde/rpc.cc index d863d50..e4ade22 100644 --- hbase-native-client/serde/rpc.cc +++ hbase-native-client/serde/rpc.cc @@ -110,6 +110,7 @@ unique_ptr RpcSerde::Header(const string &user) { // That may or may not be the correct thing to do. // It worked for a while with the java client; until it // didn't. + // TODO: send the service name and user from the RpcClient h.set_service_name(INTERFACE); return PrependLength(SerializeMessage(h)); }