From 5dd0020fe65f86b293bbf8f84e50b8b7fc0438ee Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Thu, 6 Apr 2017 15:33:42 -0700 Subject: [PATCH] HBASE-17800. [C++] handle exceptions in client RPC --- hbase-native-client/connection/BUCK | 1 + .../connection/client-dispatcher.cc | 8 +- hbase-native-client/connection/client-handler.cc | 14 +++- hbase-native-client/connection/client-handler.h | 1 + hbase-native-client/connection/response.h | 14 +++- .../core/async-rpc-retrying-caller.h | 11 ++- hbase-native-client/exceptions/BUCK | 2 +- hbase-native-client/exceptions/exception.h | 89 +++++++++++++++++++++- 8 files changed, 130 insertions(+), 10 deletions(-) diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index 19536d5..36111f8 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -48,6 +48,7 @@ cxx_library( "//security:security", "//third-party:folly", "//third-party:wangle", + "//exceptions:exceptions", ], compiler_flags=['-Weffc++'], visibility=['//core/...',],) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 626fc76..d522933 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -35,9 +35,11 @@ void ClientDispatcher::read(Context *ctx, std::unique_ptr in) { requests_.erase(call_id); - // TODO(eclark): check if the response - // is an exception. If it is then set that. - p.setValue(std::move(in)); + if (in->exception()) { + p.setException(*(in->exception())); + } else { + p.setValue(std::move(in)); + } } Future> ClientDispatcher::operator()(std::unique_ptr arg) { diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index af84572..97ccddc 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -92,8 +92,20 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { } received->set_resp_msg(resp_msg); + } else { + auto remote_exception = std::make_shared(); + hbase::pb::ExceptionResponse exceptionResponse = header.exception(); + + remote_exception->set_exception_class_name(exceptionResponse.has_exception_class_name() ? exceptionResponse.exception_class_name() : "") + ->set_stack_trace(exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "") + ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "") + ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0); + if (exceptionResponse.has_do_not_retry()) { + remote_exception->set_do_not_retry(exceptionResponse.do_not_retry()); + } + + received->set_exception(remote_exception); } - // TODO: set exception in Response here ctx->fireRead(std::move(received)); } diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index afb8e62..c4bd064 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -29,6 +29,7 @@ #include "serde/codec.h" #include "serde/rpc.h" +#include "exceptions/exception.h" // Forward decs. namespace hbase { diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h index 1d60fed..6e2e2d1 100644 --- a/hbase-native-client/connection/response.h +++ b/hbase-native-client/connection/response.h @@ -44,7 +44,10 @@ class Response { * Constructor. * Initinalizes the call id to 0. 0 should never be a valid call id. */ - Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {} + Response() : + call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_( + nullptr) { + } /** Get the call_id */ uint32_t call_id() { return call_id_; } @@ -70,9 +73,18 @@ class Response { const std::unique_ptr& cell_scanner() const { return cell_scanner_; } + std::shared_ptr exception() { + return exception_; + } + + void set_exception(std::shared_ptr value) { + exception_ = value; + } + private: uint32_t call_id_; std::shared_ptr resp_msg_; std::unique_ptr cell_scanner_; + std::shared_ptr exception_; }; } // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h index 6503301..b2de692 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller.h +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -138,13 +138,20 @@ class AsyncSingleRequestRpcRetryingCaller { }); } + bool shouldRetry(const std::exception& error) { + bool result = SysUtil::InstanceOf(error); + if (result) { + result &= static_cast(&error)->do_not_retry(); + } + return result; + } + void OnError(const std::exception& error, Supplier err_msg, Consumer update_cached_location) { ThrowableWithExtraContext twec(std::make_shared(error), TimeUtil::GetNowNanos()); exceptions_->push_back(twec); - if (SysUtil::InstanceOf(error) || - tries_ >= max_retries_) { + if (!shouldRetry(error) || tries_ >= max_retries_) { CompleteExceptionally(); return; } diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK index a23654c..eef4437 100644 --- a/hbase-native-client/exceptions/BUCK +++ b/hbase-native-client/exceptions/BUCK @@ -21,4 +21,4 @@ cxx_library( srcs=[], deps=["//third-party:folly",], compiler_flags=['-Weffc++'], - visibility=['//core/...'],) \ No newline at end of file + visibility=['//core/...','//connection//...'],) \ No newline at end of file diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index c0c4142..b68022e 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -54,9 +54,11 @@ private: class IOException: public std::logic_error { public: + IOException() : logic_error("") {} + IOException( const std::string& what) : - logic_error(what), cause_(nullptr) {} + logic_error(what) {} IOException( const std::string& what, std::shared_ptr cause) : @@ -99,6 +101,89 @@ private: class HBaseIOException : public IOException { }; -class DoNotRetryIOException : public HBaseIOException { +class RemoteException : public IOException { +public: + + RemoteException() : port_(0), do_not_retry_(false) {} + + RemoteException(const std::string& what) : + IOException(what), port_(0), do_not_retry_(false) { + } + + RemoteException( + const std::string& what, + std::shared_ptr cause) : + IOException(what, cause), port_(0), do_not_retry_(false) { + } + + virtual ~RemoteException() = default; + + std::string exception_class_name() const { + return exception_class_name_; + } + + RemoteException* set_exception_class_name(const std::string& value) { + exception_class_name_ = value; + return this; + } + + std::string stack_trace() const { + return stack_trace_; + } + + RemoteException* set_stack_trace(const std::string& value) { + stack_trace_ = value; + return this; + } + + std::string hostname() const { + return hostname_; + } + + RemoteException* set_hostname(const std::string& value) { + hostname_ = value; + return this; + } + + int port() const { + return port_; + } + + RemoteException* set_port(int value) { + port_ = value; + return this; + } + + bool do_not_retry() const { + return do_not_retry_; + } + + RemoteException* set_do_not_retry(bool value) { + do_not_retry_ = value; + return this; + } + + const char* what() const noexcept override { + std::string buf; + + if (!exception_class_name_.empty()) { + buf.append(exception_class_name_); + buf.append("\n"); + } + + if (!stack_trace_.empty()) { + buf.append(stack_trace_); + buf.append("\n"); + } + + return buf.c_str(); + } + +private: + std::string exception_class_name_; + std::string stack_trace_; + std::string hostname_; + int port_; + bool do_not_retry_; }; } // namespace hbase -- 2.10.1 (Apple Git-78)