diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 775df68..31fc095 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -40,7 +40,8 @@ ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec serde_(codec), server_(server), once_flag_(std::make_unique()), - resp_msgs_(std::make_unique>>(5000)) { + resp_msgs_(std::make_unique>>(5000)), + mutex_(std::make_unique()) { } void ClientHandler::read(Context *ctx, std::unique_ptr buf) { @@ -54,14 +55,14 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { << " has_exception=" << header.has_exception() << ", server: " << server_; // Get the response protobuf from the map - auto search = resp_msgs_->find(header.call_id()); + auto search = find(header.call_id()); // It's an error if it's not there. CHECK(search != resp_msgs_->end()); auto resp_msg = search->second; CHECK(resp_msg != nullptr); // Make sure we don't leak the protobuf - resp_msgs_->erase(header.call_id()); + erase(header.call_id()); // set the call_id. // This will be used to by the dispatcher to match up @@ -133,9 +134,28 @@ folly::Future ClientHandler::write(Context *ctx, std::unique_ptrDebugString() << ", server: " << server_; // Now store the call id to response. - resp_msgs_->insert(r->call_id(), r->resp_msg()); + insert(r->call_id(), r->resp_msg()); + + VLOG(3) << "INSERTED RPC Request with call_id:" + << r->call_id(); // TODO: more logging for RPC Header // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } + +void ClientHandler::insert(uint32_t callid, std::shared_ptr resp_msg) { + std::lock_guard lock(*mutex_); + resp_msgs_->insert(std::make_pair(callid, resp_msg)); +} + +void ClientHandler::erase(uint32_t callid) { + std::lock_guard lock(*mutex_); + resp_msgs_->erase(callid); +} + +std::unordered_map>::const_iterator ClientHandler::find( + uint32_t callid) { + std::shared_lock lock(*mutex_); + return resp_msgs_->find(callid); +} } // namespace hbase diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 4c106e0..9c2a81a 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -81,7 +82,11 @@ class ClientHandler std::string server_; // for logging // in flight requests - std::unique_ptr>> + std::unique_ptr>> resp_msgs_; + std::unique_ptr mutex_; + void insert(uint32_t callid, std::shared_ptr resp_msg); + void erase(uint32_t callid); + std::unordered_map>::const_iterator find(uint32_t callid); }; } // namespace hbase