From 3b71d001903d38bc6e54bd16e5638849a1b5b9d9 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 5 May 2016 13:51:18 -0700 Subject: [PATCH] HBASE-15777 Fix needs header in client handler --- hbase-native-client/connection/client-handler.cc | 31 ++++++++++++++---------- hbase-native-client/connection/client-handler.h | 19 ++++++++++++++- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index b92ad89..4fdb7ae 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,7 +37,7 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), need_send_header_(true), serde_(), + : user_name_(user_name), serde_(), header_info_(), resp_msgs_( make_unique>>(5000)) {} @@ -81,22 +81,27 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { } } -// TODO(eclark): Figure out how to handle the -// network errors that are going to come. Future ClientHandler::write(Context *ctx, std::unique_ptr r) { // Keep track of if we have sent the header. - if (UNLIKELY(need_send_header_)) { - need_send_header_ = false; + // + // even though the bool is atomic we can load it lazily here. + if (UNLIKELY(header_info_->need_.load(std::memory_order_relaxed))) { - // Should we be sending just one fireWrite? - // Right now we're sending one for the header - // and one for the request. + // Grab the lock. + // We need to make sure that no one gets past here without there being a + // hearder sent. + std::lock_guard lock(header_info_->mutex_); + + // Now see if we are the first thread to get through. // - // That doesn't seem like too bad, but who knows. - auto pre = serde_.Preamble(); - auto header = serde_.Header(user_name_); - pre->appendChain(std::move(header)); - ctx->fireWrite(std::move(pre)); + // If this is the first thread to get through then the + // need_send_header will have been true before this. + if (header_info_->need_.exchange(false)) { + auto pre = serde_.Preamble(); + auto header = serde_.Header(user_name_); + pre->appendChain(std::move(header)); + ctx->fireWrite(std::move(pre)); + } } resp_msgs_->insert(r->call_id(), r->resp_msg()); diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index be5143c..1a4275f 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -21,6 +21,8 @@ #include #include +#include +#include #include #include "serde/rpc.h" @@ -29,6 +31,7 @@ namespace hbase { class Request; class Response; +class HeaderInfo; } namespace google { namespace protobuf { @@ -37,6 +40,7 @@ class Message; } namespace hbase { + class ClientHandler : public wangle::Handler, Response, std::unique_ptr, std::unique_ptr> { @@ -47,7 +51,7 @@ public: std::unique_ptr r) override; private: - bool need_send_header_; + std::unique_ptr header_info_; std::string user_name_; RpcSerde serde_; @@ -56,4 +60,17 @@ private: uint32_t, std::shared_ptr>> resp_msgs_; }; + +/** + * Class to contain the info about if the connection header and preamble has + * been sent. + * + * We use a serperate class here so that ClientHandler is relocatable. + */ +class HeaderInfo { +public: + HeaderInfo() : need_(true), mutex_() {} + std::atomic need_; + std::mutex mutex_; +}; } // namespace hbase -- 2.8.0-rc2