From ce313e2147216b895393273b7257fb9e2301a713 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Mon, 20 Mar 2017 15:04:53 -0700 Subject: [PATCH] HBASE-17800. [C++] handle exceptions in client RPC --- hbase-native-client/connection/BUCK | 1 + .../connection/client-dispatcher.cc | 8 +- hbase-native-client/connection/client-handler.cc | 14 +++- hbase-native-client/connection/client-handler.h | 1 + hbase-native-client/connection/response.h | 14 +++- .../core/async-rpc-retrying-caller.h | 14 +++- hbase-native-client/exceptions/exception.h | 96 +++++++++++++++++++++- 7 files changed, 139 insertions(+), 9 deletions(-) diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index 19536d5..36111f8 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -48,6 +48,7 @@ cxx_library( "//security:security", "//third-party:folly", "//third-party:wangle", + "//exceptions:exceptions", ], compiler_flags=['-Weffc++'], visibility=['//core/...',],) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 626fc76..afecfc0 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -35,9 +35,11 @@ void ClientDispatcher::read(Context *ctx, std::unique_ptr in) { requests_.erase(call_id); - // TODO(eclark): check if the response - // is an exception. If it is then set that. - p.setValue(std::move(in)); + if (in->exception()) { + p->set_exception(*(in->exception())); + } else { + p.setValue(std::move(in)); + } } Future> ClientDispatcher::operator()(std::unique_ptr arg) { diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index af84572..e51af8f 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -92,8 +92,20 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { } received->set_resp_msg(resp_msg); + } else { + auto remote_exception = std::make_shared(); + hbase::pb::ExceptionResponse exceptionResponse = header.exception(); + + remote_exception->set_exception_class_name(exceptionResponse.has_exception_class_name() ? exceptionResponse.exception_class_name() : "") + ->set_stack_trace(exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "") + ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "") + ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0); + if (exceptionResponse.has_do_not_retry()) { + remote_exception->set_do_not_retry(exceptionResponse.do_not_retry()); + } + + received->set_exception(dynamic_pointer_cast(remote_exception)); } - // TODO: set exception in Response here ctx->fireRead(std::move(received)); } diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index afb8e62..c4bd064 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -29,6 +29,7 @@ #include "serde/codec.h" #include "serde/rpc.h" +#include "exceptions/exception.h" // Forward decs. namespace hbase { diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h index 1d60fed..6e2e2d1 100644 --- a/hbase-native-client/connection/response.h +++ b/hbase-native-client/connection/response.h @@ -44,7 +44,10 @@ class Response { * Constructor. * Initinalizes the call id to 0. 0 should never be a valid call id. */ - Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {} + Response() : + call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_( + nullptr) { + } /** Get the call_id */ uint32_t call_id() { return call_id_; } @@ -70,9 +73,18 @@ class Response { const std::unique_ptr& cell_scanner() const { return cell_scanner_; } + std::shared_ptr exception() { + return exception_; + } + + void set_exception(std::shared_ptr value) { + exception_ = value; + } + private: uint32_t call_id_; std::shared_ptr resp_msg_; std::unique_ptr cell_scanner_; + std::shared_ptr exception_; }; } // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h index 6503301..e854bee 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller.h +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -138,13 +138,23 @@ class AsyncSingleRequestRpcRetryingCaller { }); } + bool shouldRetry(const std::exception& error) { + bool result = SysUtil::InstanceOf(error); + if (result) { + auto e = std::make_shared(error); + auto r = dynamic_pointer_cast(e); + result &= r->do_not_retry(); + } + + return result; + } + void OnError(const std::exception& error, Supplier err_msg, Consumer update_cached_location) { ThrowableWithExtraContext twec(std::make_shared(error), TimeUtil::GetNowNanos()); exceptions_->push_back(twec); - if (SysUtil::InstanceOf(error) || - tries_ >= max_retries_) { + if (!shouldRetry(error) || tries_ >= max_retries_) { CompleteExceptionally(); return; } diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index c0c4142..8e28147 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -54,9 +54,11 @@ private: class IOException: public std::logic_error { public: + IOException() : logic_error("") {} + IOException( const std::string& what) : - logic_error(what), cause_(nullptr) {} + logic_error(what) {} IOException( const std::string& what, std::shared_ptr cause) : @@ -99,6 +101,96 @@ private: class HBaseIOException : public IOException { }; -class DoNotRetryIOException : public HBaseIOException { +class RemoteException : public IOException, std::enable_shared_from_this< + RemoteException> { +public: + typedef std::shared_ptr SharedThisPtr; + + RemoteException() : port_(0), do_not_retry_(false) {} + + RemoteException(const std::string& what) : + IOException(what), port_(0), do_not_retry_(false) { + } + + RemoteException( + const std::string& what, + std::shared_ptr cause) : + IOException(what, cause), port_(0), do_not_retry_(false) { + } + + virtual ~RemoteException() = default; + + std::string exception_class_name() { + return exception_class_name_; + } + + SharedThisPtr set_exception_class_name(const std::string& value) { + exception_class_name_ = value; + return shared_this(); + } + + std::string stack_trace() { + return stack_trace_; + } + + SharedThisPtr set_stack_trace(const std::string& value) { + stack_trace_ = value; + return shared_this(); + } + + std::string hostname() { + return hostname_; + } + + SharedThisPtr set_hostname(const std::string& value) { + hostname_ = value; + return shared_this(); + } + + int port() { + return port_; + } + + SharedThisPtr set_port(int value) { + port_ = value; + return shared_this(); + } + + bool do_not_retry() { + return do_not_retry_; + } + + SharedThisPtr set_do_not_retry(bool value) { + do_not_retry_ = value; + return shared_this(); + } + + const char* what() const override { + std::string buf; + + if (!exception_class_name_.empty()) { + buf.append(exception_class_name_); + buf.append("\n"); + } + + if (!stack_trace.empty()) { + buf.append(stack_trace); + buf.append("\n"); + } + + return buf.c_str(); + } + +private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + +private: + std::string exception_class_name_; + std::string stack_trace_; + std::string hostname_; + int port_; + bool do_not_retry_; }; } // namespace hbase -- 2.10.1 (Apple Git-78)