From 5d4372c52718606a379202f69f72bb4a10b1549a Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Thu, 9 Feb 2017 15:12:16 -0800 Subject: [PATCH] HBASE-17465. [C++] implement request retry mechanism over RPC --- hbase-native-client/Makefile | 2 +- hbase-native-client/bin/start-docker.sh | 5 +- hbase-native-client/connection/connection-pool.cc | 7 + hbase-native-client/connection/rpc-client.cc | 22 +- hbase-native-client/connection/rpc-client.h | 2 + hbase-native-client/core/BUCK | 17 + hbase-native-client/core/async-connection.cc | 25 ++ hbase-native-client/core/async-connection.h | 35 ++ .../core/async-rpc-retrying-caller-factory.cc | 24 ++ .../core/async-rpc-retrying-caller-factory.h | 143 ++++++++ .../core/async-rpc-retrying-caller.cc | 24 ++ .../core/async-rpc-retrying-caller.h | 280 ++++++++++++++++ .../core/async-rpc-retrying-test.cc | 359 +++++++++++++++++++++ hbase-native-client/core/client-test.cc | 67 +--- hbase-native-client/core/client-test.h | 88 +++++ hbase-native-client/core/hbase-rpc-controller.cc | 24 ++ hbase-native-client/core/hbase-rpc-controller.h | 58 ++++ hbase-native-client/core/hconstants.cc | 27 ++ hbase-native-client/core/hconstants.h | 39 +++ hbase-native-client/core/location-cache.cc | 2 + hbase-native-client/core/region-location.h | 6 + hbase-native-client/core/response_converter.cc | 4 + hbase-native-client/core/table.cc | 5 + hbase-native-client/core/table.h | 5 + hbase-native-client/exceptions/BUCK | 24 ++ hbase-native-client/exceptions/exception.h | 104 ++++++ hbase-native-client/if/Client.proto | 1 + hbase-native-client/utils/BUCK | 5 +- hbase-native-client/utils/connection-util.h | 71 ++++ hbase-native-client/utils/sys-util.h | 39 +++ hbase-native-client/utils/time-util.h | 57 ++++ 31 files changed, 1497 insertions(+), 74 deletions(-) create mode 100644 hbase-native-client/core/async-connection.cc create mode 100644 hbase-native-client/core/async-connection.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.h create mode 100644 hbase-native-client/core/async-rpc-retrying-test.cc create mode 100644 hbase-native-client/core/client-test.h create mode 100644 hbase-native-client/core/hbase-rpc-controller.cc create mode 100644 hbase-native-client/core/hbase-rpc-controller.h create mode 100644 hbase-native-client/core/hconstants.cc create mode 100644 hbase-native-client/core/hconstants.h create mode 100644 hbase-native-client/exceptions/BUCK create mode 100644 hbase-native-client/exceptions/exception.h create mode 100644 hbase-native-client/utils/connection-util.h create mode 100644 hbase-native-client/utils/sys-util.h create mode 100644 hbase-native-client/utils/time-util.h diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index 99e38ef..133bba7 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils security +MODULES = connection core serde test-util utils security exceptions SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git a/hbase-native-client/bin/start-docker.sh b/hbase-native-client/bin/start-docker.sh index 38affa0..1f78201 100755 --- a/hbase-native-client/bin/start-docker.sh +++ b/hbase-native-client/bin/start-docker.sh @@ -52,8 +52,9 @@ fi; docker build -t hbase_native . # After the image is built run the thing -docker run -p 16050:16050/tcp \ - -v ${BASE_DIR}/..:/usr/src/hbase \ +# docker run -p 16050:16050/tcp \ + docker run \ + -v ${BASE_DIR}/..:/usr/src/hbase \ -v ~/.m2:/root/.m2 \ -it hbase_native /bin/bash popd diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 15dd64e..0895756 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -24,6 +24,7 @@ #include #include +#include using std::mutex; using std::unique_ptr; @@ -86,13 +87,19 @@ std::shared_ptr ConnectionPool::GetNewConnection( /* create new connection */ auto clientBootstrap = cf_->MakeBootstrap(); + LOG(INFO) << "remote_id: " + remote_id->host() + ":" + folly::to + < std::string > (remote_id->port()); auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); + LOG(INFO) << "created new dispatcher"; auto conneciton = std::make_shared(remote_id, dispatcher); + LOG(INFO) << "created RpcConnection"; connections_.insert(std::make_pair(remote_id, conneciton)); clients_.insert(std::make_pair(remote_id, clientBootstrap)); + LOG(INFO) << "inserted connection to cache"; + LOG(INFO) << "got new connection"; return conneciton; } } diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 7621193..b1abfec 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -20,6 +20,7 @@ #include "connection/rpc-client.h" #include #include +#include using hbase::RpcClient; using hbase::AbstractRpcChannel; @@ -30,7 +31,9 @@ class RpcChannelImplementation : public AbstractRpcChannel { public: RpcChannelImplementation(std::shared_ptr rpc_client, const std::string& host, uint16_t port, std::shared_ptr ticket, int rpc_timeout) - : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {} + : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) { + LOG(INFO) << "in RpcChannelImplementation constructor"; + } void CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* request, Message* response, Closure* done) override { @@ -39,6 +42,11 @@ class RpcChannelImplementation : public AbstractRpcChannel { }; } // namespace hbase +RpcClient::RpcClient() { + io_executor_ = std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); + cp_ = std::make_shared(io_executor_); +} + RpcClient::RpcClient(std::shared_ptr io_executor) : io_executor_(io_executor) { cp_ = std::make_shared(io_executor_); @@ -82,9 +90,11 @@ std::shared_ptr RpcClient::GetConnection(std::shared_ptr RpcClient::CreateRpcChannel(const std::string& host, uint16_t port, std::shared_ptr ticket, int rpc_timeout) { + LOG(INFO) << "/* init rpc channel */"; std::shared_ptr channel = std::make_shared( shared_from_this(), host, port, ticket, rpc_timeout); + LOG(INFO) << "/* return rpc channel */"; /* static_pointer_cast is safe since RpcChannelImplementation derives * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */ return std::static_pointer_cast(channel); @@ -93,11 +103,15 @@ std::shared_ptr RpcClient::CreateRpcChannel(const std::string& host, void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* req_msg, Message* resp_msg, Closure* done, const std::string& host, uint16_t port, std::shared_ptr ticket) { + LOG(INFO) << "start RpcClient::CallMethod"; std::shared_ptr shared_req(const_cast(req_msg)); - std::shared_ptr shared_resp(resp_msg); - + std::shared_ptr shared_resp(const_cast(resp_msg)); std::unique_ptr req = std::make_unique(shared_req, shared_resp, method->name()); + LOG(INFO) << "before AsyncCall"; + LOG(INFO) << "method name: " + method->name() << ", service name: " << method->service()->name(); + LOG(INFO) << "host: " + host << ", port: " << folly::to(port); AsyncCall(host, port, std::move(req), ticket, method->service()->name()) - .then([done, this](Response resp) { done->Run(); }); + .then([done, this](Response& resp) {if (done) {done->Run();}}); + LOG(INFO) << "after AsyncCall"; } diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index aeb9b56..a8e189e 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -51,6 +51,8 @@ class RpcClient : public std::enable_shared_from_this { friend class RpcChannelImplementation; public: + RpcClient(); + RpcClient(std::shared_ptr io_executor); virtual ~RpcClient() { Close(); } diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index d8d15a9..bb84f67 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -37,6 +37,12 @@ cxx_library( "request_converter.h", "response_converter.h", "table.h", + "async-connection.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", + "client-test.h", + "hbase-rpc-controller.h", + "hconstants.h", ], srcs=[ "cell.cc", @@ -55,6 +61,8 @@ cxx_library( "table.cc", ], deps=[ + "//exceptions:exceptions", + "//utils:utils", "//connection:connection", "//if:if", "//serde:serde", @@ -83,6 +91,15 @@ cxx_test( deps=[":core",], run_test_separately=True,) cxx_test( + name="retry-test", + srcs=["async-rpc-retrying-test.cc",], + deps=[ + ":core", + "//test-util:test-util", + "//exceptions:exceptions", + ], + run_test_separately=True,) +cxx_test( name="time_range-test", srcs=["time_range-test.cc",], deps=[":core",], diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc new file mode 100644 index 0000000..085e81f --- /dev/null +++ b/hbase-native-client/core/async-connection.cc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-connection.h" + +namespace hbase { + +} // namespace hbase + diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h new file mode 100644 index 0000000..1f7d3c6 --- /dev/null +++ b/hbase-native-client/core/async-connection.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +namespace hbase { + +class AsyncConnection { +public: + AsyncConnection() {} + virtual ~AsyncConnection() = default; +}; + +class AsyncConnectionImpl { +public: + AsyncConnectionImpl() {} + virtual ~AsyncConnectionImpl() = default; +}; +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..248f26b --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller-factory.h" + +namespace hbase { + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..bdf4801 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include "if/Client.pb.h" +#include "async-connection.h" +#include "async-rpc-retrying-caller.h" +#include +#include +#include +#include "connection/rpc-client.h" + + +using namespace folly; +using hbase::pb::TableName; + +namespace hbase { + +template +class SingleRequestCallerBuilder : public std::enable_shared_from_this< + SingleRequestCallerBuilder> { +public: + SingleRequestCallerBuilder(std::shared_ptr conn) : + conn_(conn), + table_name_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr table( + std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr rpc_timeout( + long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr operation_timeout( + long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr row(const std::string& row) { + row_ = row; + return shared_this(); + } + + SharedThisPtr locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable callable) { + LOG(INFO) << "set action as callable"; + callable_ = callable; + return shared_this(); + } + + folly::Future Call() + { + return Build()->Call(); + } + + std::shared_ptr> Build() { + return std::make_shared>( + conn_, + table_name_, + row_, + locate_type_, + callable_, + conn_->get_conn_conf()->GetPauseNs(), + conn_->get_conn_conf()->GetMaxRetries(), + operation_timeout_nanos_, + rpc_timeout_nanos_, + conn_->get_conn_conf()->GetStartLogErrorsCount()); + } + +private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + +private: + std::shared_ptr conn_; + std::shared_ptr table_name_; + long rpc_timeout_nanos_; + long operation_timeout_nanos_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; +}; // end of SingleRequestCallerBuilder + + +template +class AsyncRpcRetryingCallerFactory { +private: + std::shared_ptr conn_; + +public: + AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : + conn_(conn) { + } + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template + std::shared_ptr> Single() { + return std::make_shared>(conn_); + } +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc new file mode 100644 index 0000000..80a8f34 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h new file mode 100644 index 0000000..f02f6b8 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "hbase-rpc-controller.h" +#include "region-location.h" +#include +#include +#include +#include "if/HBase.pb.h" +#include +#include +#include "exceptions/exception.h" +#include +#include +#include "utils/sys-util.h" +#include "utils/connection-util.h" +#include "utils/time-util.h" +#include +#include + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + + +template +using Supplier = std::function; + +template +using Consumer = std::function; + +template +using Converter = std::function; + +template +using RpcCallback = std::function; + +template +using RpcCall = std::function, + std::shared_ptr, + std::shared_ptr, + std::unique_ptr, + RpcCallback&)>; + +template +using Callable = std::function(std::shared_ptr, + std::shared_ptr, + std::shared_ptr)>; + + +template +class AsyncSingleRequestRpcRetryingCaller { +public: + AsyncSingleRequestRpcRetryingCaller( + std::shared_ptr conn, + std::shared_ptr table_name, + const std::string& row, + RegionLocateType locate_type, + Callable callable, + long pause_ns, + int max_retries, + long operation_timeout_nanos, + long rpc_timeout_nanos, + int start_log_errors_count) : + conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + + controller_ = conn_->get_rpc_controller_factory()->NewController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared< + std::vector>(); + retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); + } + + virtual ~AsyncSingleRequestRpcRetryingCaller() { + } + + folly::Future Call() { + auto f = promise_->getFuture(); + LOG(INFO) << "start LocateThenCall"; + LocateThenCall(); + return f; + } + +private: + void LocateThenCall() { + long locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + LOG(INFO) << "timeout in LocateThenCall"; + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->get_locator()->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](RegionLocation &loc) { + LOG(INFO) << "before LocateThenCall::Call(loc)"; + Call(loc); + LOG(INFO) << "after LocateThenCall::Call(loc)"; + }) + .onError( + [this] (const std::exception& e) { + LOG(INFO) << "LocateThenCall exception 1: " + folly::to(e.what()); + OnError(e, + [this]() -> std::string { + return "Locate '" + + row_ + "' in " + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(operation_timeout_nanos_) + + " ms, time elapsed = " + + TimeUtil::ElapsedMsStr(this->start_ns_) + " ms";}, + [](const std::exception& error) {}); + }); + } + + void OnError(const std::exception& error, Supplier err_msg, + Consumer update_cached_location) { + LOG(INFO) << "OnError called"; + ThrowableWithExtraContext twec(std::make_shared < std::exception > (error), + TimeUtil::GetNowNanos()); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf(error) + || tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + long delay_ns; + if (operation_timeout_nanos_ > 0) { + long max_delay_ns = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = std::min(max_delay_ns, + ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1); + } + update_cached_location(error); + tries_++; + /*retry_timer_->scheduleTimeoutFn([this]() {LocateThenCall();}, milliseconds( + TimeUtil::ToMillis(delay_ns)));*/ + } + + void Call(RegionLocation& loc) { + long call_timeout_ns; + if (operation_timeout_nanos_ > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + LOG(INFO) << "timeout in Call with loc"; + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_); + } else { + call_timeout_ns = rpc_timeout_nanos_; + } + + std::shared_ptr rpc_client; + try { + rpc_client = conn_->GetRpcClient(); + LOG(INFO) << "after conn_->GetRpcClient()"; + } catch (const IOException& e) { + LOG(INFO) << "Call exception 1: " + folly::to(e.what()); + OnError(e, + [&, this]() -> std::string { + return "Get async rpc_client to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), loc.server_name().port()) + + " for '" + row_ + + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMsStr(this->start_ns_) + " ms";}, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error);}); + return; + } + + ConnectionUtils::ResetController(controller_, call_timeout_ns); + + LOG(INFO) << "start invoke callable"; + callable_(controller_, std::make_shared(loc), rpc_client).then( + [this](const RESP& resp) { + this->promise_->setValue(std::move(resp)); + }).onError( + [&, this] (const std::exception& e) { + LOG(INFO) << "Call exception 2: " + folly::to(e.what()); + OnError(e, + [&, this]() -> std::string { + return "Call to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), loc.server_name().port()) + + " for '" + row_ + "' in " + + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMsStr(this->start_ns_) + " ms";}, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error);}); + return; + }); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + long RemainingTimeNs() { + return operation_timeout_nanos_ - (TimeUtil::GetNowNanos() - start_ns_); + } + +private: + folly::HHWheelTimer::UniquePtr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; + long pause_ns_; + int max_retries_; + long operation_timeout_nanos_; + long rpc_timeout_nanos_; + int start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + long start_ns_; + int tries_; + std::shared_ptr> exceptions_; + int max_attempts_; + folly::EventBase event_base_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc new file mode 100644 index 0000000..714e503 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include "hbase-rpc-controller.h" +#include "if/HBase.pb.h" + +#include "region-location.h" +#include "if/Client.pb.h" +#include "connection/request.h" +#include "test-util/test-util.h" +#include "client-test.h" +#include "core/result.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/response_converter.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/rpc-client.h" +#include "async-rpc-retrying-caller.h" +#include "async-rpc-retrying-caller-factory.h" +#include "exceptions/exception.h" +#include +#include +#include +#include "connection/rpc-client.h" +#include + +using namespace google::protobuf; +using namespace hbase; +using namespace hbase::pb; +using namespace std::placeholders; +using namespace testing; +using ::testing::Return; +using ::testing::_; + +class MockRpcControllerFactory { +public: + MOCK_METHOD0(NewController, std::shared_ptr()); +}; + +class MockAsyncConnectionConfiguration { +public: + MOCK_METHOD0(GetPauseNs, long()); + MOCK_METHOD0(GetMaxRetries, int()); + MOCK_METHOD0(GetStartLogErrorsCount, int()); + MOCK_METHOD0(GetReadRpcTimeoutNs, long()); + MOCK_METHOD0(GetOperationTimeoutNs, long()); +}; + +class AsyncRegionLocator { +public: + AsyncRegionLocator(std::shared_ptr region_location) : + region_location_(region_location) { + } + ~AsyncRegionLocator() = default; + + folly::Future GetRegionLocation( + std::shared_ptr, const std::string&, + RegionLocateType, long) { + folly::Promise promise; + promise.setValue(*region_location_); + return promise.getFuture(); + } + + void UpdateCachedLocation(RegionLocation&, const std::exception&) { + } + +private: + std::shared_ptr region_location_; +}; + +class MockAsyncConnection { +public: + MOCK_METHOD0(get_conn_conf, std::shared_ptr()); + MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr()); + MOCK_METHOD0(get_locator, std::shared_ptr()); + MOCK_METHOD0(GetRpcClient, std::shared_ptr()); +}; + + +template +class MockRawAsyncTableImpl { +public: + MockRawAsyncTableImpl(std::shared_ptr conn) : conn_(conn) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + // folly::Future Get(const hbase::Get&); + + /* in real RawAsyncTableImpl, this should be private. */ + folly::Future GetCall( + std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc, + const hbase::Get& get) { + + hbase::RpcCall rpc_call = [this] ( + std::shared_ptr rpc_client, + std::shared_ptr loc, + std::shared_ptr controller, + std::unique_ptr preq, + RpcCallback& rpc_callback) { + LOG(INFO) << "call through rpc client"; + rpc_client->AsyncCall( + loc->server_name().host_name(), + loc->server_name().port(), + std::move(preq), User::defaultUser(), "ClientService") + .then([&](const hbase::Response& resp) {rpc_callback(std::move(resp));}); + }; + + return Call( + rpc_client, + controller, + loc, + get, + &hbase::RequestConverter::ToGetRequest, + rpc_call, + &hbase::ResponseConverter::FromGetResponse); + } + + /* in real RawAsyncTableImpl, this should be private. */ + template + folly::Future Call( + std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc, + const REQ& req, + const Converter, REQ, std::string>& req_converter, + const hbase::RpcCall& rpc_call, + const Converter, PRESP>& resp_converter) { + + + auto promise = std::make_shared>(); + + RpcCallback rpc_callback = [&](const PRESP& presp) { + if (controller->Failed()) { + LOG(ERROR) << "rpc callback error"; + promise->setException(hbase::IOException(controller->ErrorText())); + } else { + try { + LOG(INFO) << "rpc callback good"; + std::unique_ptr result = resp_converter(presp); + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + LOG(INFO) << "passed 1"; + EXPECT_EQ("test2", result->Row()); + LOG(INFO) << "passed 2"; + EXPECT_EQ("value2", *(result->Value("d", "2"))); + LOG(INFO) << "passed 3"; + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + LOG(INFO) << "passed 4"; + promise->setValue(std::move(*result)); + } catch (const std::exception& e) { + promise->setException(e); + } + } + }; + + LOG(INFO) << "before rpc_call"; + rpc_call( + rpc_client, + loc, + controller, + std::move(req_converter(req, loc->region_name())), + rpc_callback); + + LOG(INFO) << "done with mock call"; + return promise->getFuture(); + } + +private: + std::shared_ptr conn_; +}; + + +void foo() { + + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to < hbase::pb::TableName > ("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf.value()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform the Get + auto result = table->Get(get); + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + + table->Close(); + client.Close(); +} + +TEST(AsyncRpcRetryTest, TestGetBasic) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to < hbase::pb::TableName > ("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf.value()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + /* init region location and rpc channel */ + auto region_location = table->GetRegionLocation(row); + LOG(INFO) + << "host:port is " + region_location->server_name().host_name() + ":" + + folly::to < std::string > (region_location->server_name().port()); + + auto rpc_client = std::make_shared(); + + LOG(INFO) << "/* init rpc controller */"; + /* init rpc controller */ + auto controller = std::make_shared(); + + LOG(INFO) << "/* init rpc controller factory */"; + /* init rpc controller factory */ + auto controller_factory = std::make_shared(); + EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller)); + + LOG(INFO) << "/* init connection configuration */"; + /* init connection configuration */ + auto connection_conf = std::make_shared(); + EXPECT_CALL((*connection_conf), GetPauseNs()).Times(1).WillRepeatedly(Return(100000000)); + EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31)); + EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9)); + EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs()).Times(1).WillRepeatedly(Return(60000000000)); + EXPECT_CALL((*connection_conf), GetOperationTimeoutNs()).Times(1).WillRepeatedly(Return(1200000000000)); + + LOG(INFO) << "/* init region locator */"; + /* init region locator */ + auto region_locator = std::make_shared(region_location); + + LOG(INFO) << "/* init hbase client connection */"; + /* init hbase client connection */ + auto conn = std::make_shared(); + EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf)); + EXPECT_CALL((*conn), get_rpc_controller_factory()).Times(AtLeast(1)).WillRepeatedly(Return(controller_factory)); + EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator)); + EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly( + Return(rpc_client)); + + LOG(INFO) << "/* init retry caller factory */"; + /* init retry caller factory */ + auto tableImpl = std::make_shared>(conn); + AsyncRpcRetryingCallerFactory caller_factory(conn); + + LOG(INFO) << "/* init request caller builder */"; + /* init request caller builder */ + auto builder = caller_factory.Single(); + + LOG(INFO) << "/* call with retry to get result */"; + /* call with retry to get result */ + try { + auto async_caller = builder + ->table(std::make_shared(tn)) + ->row(row) + ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) + ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) + ->action( + [=, &get]( + std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future { + return tableImpl->GetCall(rpc_client, controller, loc, get);}) + ->Build(); + + hbase::Result result = async_caller->Call().get(); + + /*Stopping the connection as we are getting segfault due to some folly issue + The connection stays open and we don't want that. + So we are stopping the connection. + We can remove this once we have fixed the folly part */ + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result.Row()); + EXPECT_EQ("value2", *(result.Value("d", "2"))); + EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); + } catch(std::exception &e) { + LOG(INFO) << e.what(); + } + + table->Close(); + client.Close(); +} + diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index 28eec6f..ae73c58 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -17,72 +17,7 @@ * */ -#include "core/client.h" -#include -#include "core/configuration.h" -#include "core/get.h" -#include "core/hbase_configuration_loader.h" -#include "core/result.h" -#include "core/table.h" -#include "serde/table-name.h" -#include "test-util/test-util.h" - -class ClientTest { - public: - const static std::string kDefHBaseConfPath; - - const static std::string kHBaseDefaultXml; - const static std::string kHBaseSiteXml; - - const static std::string kHBaseXmlData; - - static void WriteDataToFile(const std::string &file, const std::string &xml_data) { - std::ofstream hbase_conf; - hbase_conf.open(file.c_str()); - hbase_conf << xml_data; - hbase_conf.close(); - } - - static void CreateHBaseConf(const std::string &dir, const std::string &file, - const std::string xml_data) { - // Directory will be created if not present - if (!boost::filesystem::exists(dir)) { - boost::filesystem::create_directories(dir); - } - // Remove temp file always - boost::filesystem::remove((dir + file).c_str()); - WriteDataToFile((dir + file), xml_data); - } - - static void CreateHBaseConfWithEnv() { - // Creating Empty Config Files so that we dont get a Configuration exception @Client - CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData); - CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData); - setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); - } -}; - -const std::string ClientTest::kDefHBaseConfPath("./build/test-data/client-test/conf/"); - -const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml"); -const std::string ClientTest::kHBaseSiteXml("hbase-site.xml"); - -const std::string ClientTest::kHBaseXmlData( - "\n\n\n\n\n"); +#include "client-test.h" TEST(Client, EmptyConfigurationPassedToClient) { ASSERT_ANY_THROW(hbase::Client client); } diff --git a/hbase-native-client/core/client-test.h b/hbase-native-client/core/client-test.h new file mode 100644 index 0000000..b8ec691 --- /dev/null +++ b/hbase-native-client/core/client-test.h @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/client.h" +#include +#include "core/configuration.h" +#include "core/get.h" +#include "core/hbase_configuration_loader.h" +#include "core/result.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" + +class ClientTest { + public: + const static std::string kDefHBaseConfPath; + + const static std::string kHBaseDefaultXml; + const static std::string kHBaseSiteXml; + + const static std::string kHBaseXmlData; + + static void WriteDataToFile(const std::string &file, const std::string &xml_data) { + std::ofstream hbase_conf; + hbase_conf.open(file.c_str()); + hbase_conf << xml_data; + hbase_conf.close(); + } + + static void CreateHBaseConf(const std::string &dir, const std::string &file, + const std::string xml_data) { + // Directory will be created if not present + if (!boost::filesystem::exists(dir)) { + boost::filesystem::create_directories(dir); + } + // Remove temp file always + boost::filesystem::remove((dir + file).c_str()); + WriteDataToFile((dir + file), xml_data); + } + + static void CreateHBaseConfWithEnv() { + // Creating Empty Config Files so that we dont get a Configuration exception @Client + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData); + setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); + } +}; + +const std::string ClientTest::kDefHBaseConfPath("./build/test-data/client-test/conf/"); + +const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml"); +const std::string ClientTest::kHBaseSiteXml("hbase-site.xml"); + +const std::string ClientTest::kHBaseXmlData( + "\n\n\n\n\n"); + diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc new file mode 100644 index 0000000..f5b5963 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase-rpc-controller.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h new file mode 100644 index 0000000..613112f --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include + +using google::protobuf::RpcController; +using google::protobuf::Closure; +using namespace std; + +namespace hbase { + +class HBaseRpcController: public RpcController { +public: + HBaseRpcController() {} + virtual ~HBaseRpcController() = default; + + void set_call_timeout(const long& call_timeout) { + // TODO: + } + + void Reset() override {} + + bool Failed() const override { + return false; + } + + string ErrorText() const override{ return "";} + + void StartCancel() override {} + + void SetFailed(const string& reason) override {} + + bool IsCanceled() const override { return false; } + + void NotifyOnCancel(Closure* callback) override {} +}; + +} /* namespace hbase */ + + diff --git a/hbase-native-client/core/hconstants.cc b/hbase-native-client/core/hconstants.cc new file mode 100644 index 0000000..d7cc788 --- /dev/null +++ b/hbase-native-client/core/hconstants.cc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hconstants.h" + +namespace hbase { + +const std::vector HConstants::RETRY_BACKOFF = { 1, 2, 3, 5, 10, 20, 40, + 100, 100, 100, 100, 200, 200 }; +} /* namespace hbase */ + diff --git a/hbase-native-client/core/hconstants.h b/hbase-native-client/core/hconstants.h new file mode 100644 index 0000000..0091ee0 --- /dev/null +++ b/hbase-native-client/core/hconstants.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +namespace hbase { + +class HConstants { +public: + /** + * Retrying we multiply hbase.client.pause setting by what we have in this array until we + * run out of array items. Retries beyond this use the last number in the array. So, for + * example, if hbase.client.pause is 1 second, and maximum retries count + * hbase.client.retries.number is 10, we will retry at the following intervals: + * 1, 2, 3, 5, 10, 20, 40, 100, 100, 100. + * With 100ms, a back-off of 200 means 20s + */ + static const std::vector RETRY_BACKOFF; +}; + +} /* namespace hbase */ + diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index dab5deb..d3ede57 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -32,6 +32,7 @@ #include "serde/region-info.h" #include "serde/server-name.h" #include "serde/zk.h" +#include using namespace std; using namespace folly; @@ -122,6 +123,7 @@ Future> LocationCache::LocateFromMeta(const Tabl return this->LocateMeta() .via(cpu_executor_.get()) .then([this](ServerName sn) { + LOG(INFO) << "host=" + sn.host_name() + ", port=" + folly::to(sn.port()); auto remote_id = std::make_shared(sn.host_name(), sn.port()); return this->cp_->GetConnection(remote_id); }) diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index b0411cb..5840e1e 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -26,6 +26,12 @@ namespace hbase { +enum RegionLocateType { + kBefore, + kCurrent, + kAfter +}; + /** * @brief class to hold where a region is located. * diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc index 3fe2ba9..019f9ee 100644 --- a/hbase-native-client/core/response_converter.cc +++ b/hbase-native-client/core/response_converter.cc @@ -23,6 +23,7 @@ #include "core/cell.h" #include "if/Client.pb.h" +#include using hbase::pb::GetResponse; @@ -33,8 +34,10 @@ ResponseConverter::ResponseConverter() {} ResponseConverter::~ResponseConverter() {} std::unique_ptr ResponseConverter::FromGetResponse(const Response& resp) { + LOG(INFO) << "start converting response"; auto get_resp = std::static_pointer_cast(resp.resp_msg()); + LOG(INFO) << "go through cells"; std::vector> vcells; for (auto cell : get_resp->result().cell()) { std::shared_ptr pcell = @@ -43,6 +46,7 @@ std::unique_ptr ResponseConverter::FromGetResponse(const Response vcells.push_back(pcell); } + LOG(INFO) << "done with converting response"; return std::make_unique(vcells, get_resp->result().exists(), get_resp->result().stale(), get_resp->result().partial()); } diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 58125f9..1eb2c55 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -71,4 +71,9 @@ void Table::Close() { is_closed_ = true; } +std::shared_ptr Table::GetRegionLocation(const std::string& row) { + // return location_cache_->LocateRegion(*table_name_, row).get(milliseconds(1000)); + return location_cache_->LocateRegion(*table_name_, row).get(); +} + } /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 0e98cd2..0ed32da 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -57,6 +57,11 @@ class Table { */ void Close(); + /** + * @brief - Get region location for a row in current table. + */ + std::shared_ptr GetRegionLocation(const std::string& row); + private: std::shared_ptr table_name_; std::shared_ptr location_cache_; diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK new file mode 100644 index 0000000..4a5b983 --- /dev/null +++ b/hbase-native-client/exceptions/BUCK @@ -0,0 +1,24 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cxx_library( + name="exceptions", + exported_headers=["exception.h",], + srcs=[], + deps=["//third-party:folly",], + compiler_flags=['-Weffc++'], + visibility=['//core/...'],) \ No newline at end of file diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h new file mode 100644 index 0000000..c0c4142 --- /dev/null +++ b/hbase-native-client/exceptions/exception.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +namespace hbase { + +class ThrowableWithExtraContext { +public: + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when) : + cause_(cause), when_(when), extras_("") { + } + + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when, const std::string& extras) : + cause_(cause), when_(when), extras_(extras) { + } + + std::string ToString() { + // TODO: + // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + return extras_ + ", " + cause_->what(); + } + + std::shared_ptr cause() { + return cause_; + } +private: + std::shared_ptr cause_; + long when_; + std::string extras_; +}; + +class IOException: public std::logic_error { +public: + IOException( + const std::string& what) : + logic_error(what), cause_(nullptr) {} + IOException( + const std::string& what, + std::shared_ptr cause) : + logic_error(what), cause_(cause) {} + virtual ~IOException() = default; + + std::shared_ptr cause() { + return cause_; + } +private: + const std::shared_ptr cause_; +}; + +class RetriesExhaustedException: public IOException { +public: + RetriesExhaustedException( + const int& num_retries, + std::shared_ptr> exceptions) : + IOException( + GetMessage(num_retries, exceptions), + exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + } + virtual ~RetriesExhaustedException() = default; + +private: + std::string GetMessage( + const int& num_retries, + std::shared_ptr> exceptions) { + std::string buffer("Failed after attempts="); + buffer.append(std::to_string(num_retries + 1)); + buffer.append(", exceptions:\n"); + for (auto it = exceptions->begin(); it != exceptions->end(); it++) { + buffer.append(it->ToString()); + buffer.append("\n"); + } + return buffer; + } +}; + +class HBaseIOException : public IOException { +}; + +class DoNotRetryIOException : public HBaseIOException { +}; +} // namespace hbase diff --git a/hbase-native-client/if/Client.proto b/hbase-native-client/if/Client.proto index 8a4d459..301d52a 100644 --- a/hbase-native-client/if/Client.proto +++ b/hbase-native-client/if/Client.proto @@ -22,6 +22,7 @@ package hbase.pb; option java_package = "org.apache.hadoop.hbase.protobuf.generated"; option java_outer_classname = "ClientProtos"; option java_generic_services = true; +option cc_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK index 120331a..dd286d9 100644 --- a/hbase-native-client/utils/BUCK +++ b/hbase-native-client/utils/BUCK @@ -17,7 +17,10 @@ cxx_library( name="utils", - exported_headers=["user-util.h",], + exported_headers=["user-util.h", + "connection-util.h", + "sys-util.h", + "time-util.h"], srcs=["user-util.cc",], deps=['//third-party:folly',], tests=[":user-util-test"], diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h new file mode 100644 index 0000000..4e1cf4e --- /dev/null +++ b/hbase-native-client/utils/connection-util.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include ; +#include "core/hconstants.h" +#include +#include "core/hbase-rpc-controller.h" +#include "utils/time-util.h" + +namespace hbase { +class ConnectionUtils { +public: + static int Retries2Attempts(const int& retries) { + return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1); + } + + /* Add a delta to avoid timeout immediately after a retry sleeping. */ + static const long SLEEP_DELTA_NS = 1000000; + + /** + * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. + * @param pause time to pause + * @param tries amount of tries + * @return How long to wait after tries retries + */ + static long GetPauseTime(const long& pause, const int& tries) { + int ntries = tries; + if (ntries >= HConstants::RETRY_BACKOFF.size()) { + ntries = HConstants::RETRY_BACKOFF.size() - 1; + } + if (ntries < 0) { + ntries = 0; + } + + long normal_pause = pause * HConstants::RETRY_BACKOFF[ntries]; + // 1% possible jitter + float r = static_cast(std::rand()) / static_cast(RAND_MAX); + long jitter = (long) (normal_pause * r * 0.01f); + return normal_pause + jitter; + } + + static void ResetController( + std::shared_ptr controller, + const long& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns))); + } + } +}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h new file mode 100644 index 0000000..e9e5d9f --- /dev/null +++ b/hbase-native-client/utils/sys-util.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include + +namespace hbase { + +class SysUtil { +public: + template + static constexpr bool InstanceOf(const DERIVED& object) { + return !dynamic_cast(&object); + } + + template + static constexpr bool InstanceOf() { + return std::is_base_of(); + } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h new file mode 100644 index 0000000..08784b2 --- /dev/null +++ b/hbase-native-client/utils/time-util.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class TimeUtil { +public: + static long ToMillis(const long& nanos) { + std::chrono::duration_cast(nanoseconds(nanos)).count(); + } + + static std::string ToMillisStr(const long& nanos) { + return std::to_string( + std::chrono::duration_cast (nanoseconds(nanos)).count()); + } + + static long GetNowNanos() { + auto duration = + std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + static long ElapsedMs(const long& start_ns) { + std::chrono::duration_cast( + nanoseconds(GetNowNanos() - start_ns)).count(); + } + + static std::string ElapsedMsStr(const long& start_ns) { + return std::to_string( + std::chrono::duration_cast( + nanoseconds(GetNowNanos() - start_ns)).count()); + } +}; +} /* namespace hbase */ + -- 2.10.1 (Apple Git-78)