diff --git hbase-native-client/connection/client-dispatcher.cc hbase-native-client/connection/client-dispatcher.cc index 35a1f7d..0a7b01b 100644 --- hbase-native-client/connection/client-dispatcher.cc +++ hbase-native-client/connection/client-dispatcher.cc @@ -21,11 +21,13 @@ #include +#include "exceptions/exception.h" + using std::unique_ptr; namespace hbase { -ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} +ClientDispatcher::ClientDispatcher(const std::string &server) : requests_(5000), current_call_id_(9), server_(server) {} void ClientDispatcher::read(Context *ctx, unique_ptr in) { auto call_id = in->call_id(); @@ -46,6 +48,8 @@ void ClientDispatcher::read(Context *ctx, unique_ptr in) { folly::Future> ClientDispatcher::operator()(unique_ptr arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); + // TODO: if the map is full (or we have more than hbase.client.perserver.requests.threshold) + // then throw ServerTooBusyException so that upper layers will retry. requests_.insert(call_id, folly::Promise>{}); auto &p = requests_.find(call_id)->second; auto f = p.getFuture(); @@ -58,9 +62,19 @@ folly::Future> ClientDispatcher::operator()(unique_ptr ClientDispatcher::close() { return ClientDispatcherBase::close(); } +void ClientDispatcher::CleanUpCalls() { + for (auto &pair : requests_) { + pair.second.setException(IOException{"Connection closed to server:" + server_}); + } + requests_.clear(); +} + +folly::Future ClientDispatcher::close() { + CleanUpCalls(); + return ClientDispatcherBase::close(); } folly::Future ClientDispatcher::close(Context *ctx) { + CleanUpCalls(); return ClientDispatcherBase::close(ctx); } } // namespace hbase diff --git hbase-native-client/connection/client-dispatcher.h hbase-native-client/connection/client-dispatcher.h index 857042c..f8be169 100644 --- hbase-native-client/connection/client-dispatcher.h +++ hbase-native-client/connection/client-dispatcher.h @@ -40,7 +40,7 @@ class ClientDispatcher std::unique_ptr> { public: /** Create a new ClientDispatcher */ - ClientDispatcher(); + ClientDispatcher(const std::string &server); /** Read a response off the pipeline. */ void read(Context *ctx, std::unique_ptr in) override; /** Take a request as a call and send it down the pipeline. */ @@ -51,6 +51,9 @@ class ClientDispatcher folly::Future close() override; private: + void CleanUpCalls(); + + private: folly::AtomicHashMap>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. @@ -61,5 +64,6 @@ class ClientDispatcher // uint32_t has a max of 4Billion so 10 more or less is // not a big deal. std::atomic current_call_id_; + std::string server_; }; } // namespace hbase diff --git hbase-native-client/connection/connection-factory.cc hbase-native-client/connection/connection-factory.cc index d1bfbce..8f149e5 100644 --- hbase-native-client/connection/connection-factory.cc +++ hbase-native-client/connection/connection-factory.cc @@ -18,12 +18,14 @@ */ #include "connection/connection-factory.h" -#include "connection/sasl-handler.h" - -#include +#include #include #include + +#include + +#include "connection/sasl-handler.h" #include "connection/client-dispatcher.h" #include "connection/pipeline.h" #include "connection/service.h" @@ -63,7 +65,8 @@ std::shared_ptr ConnectionFactory::Connect( ->connect(folly::SocketAddress(hostname, port, true), std::chrono::duration_cast(connect_timeout_)) .get(); - auto dispatcher = std::make_shared(); + std::string server = hostname + ":" + folly::to(port); + auto dispatcher = std::make_shared(server); dispatcher->setPipeline(pipeline); return dispatcher; }