From 5be68db1877b4e5720bd4f3ea76b9d58ca1f68b7 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Thu, 2 Feb 2017 00:08:53 +1100 Subject: [PATCH] Request retry mechanism over RPC for Multi Calls. This is based on HBASE-17465 diff --git a/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h new file mode 100644 index 0000000..07f329f --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "core/async-connection.h" +#include "core/get.h" +#include "core/hregion-location.h" +#include "if/HBase.pb.h" + +using hbase::Get; +using hbase::pb::TableName; + +namespace hbase { + +template +class BatchCallerBuilder; + +template +class AsyncRpcRetryingBatchCallerFactory { + public: + template + friend class BatchCallerBuilder; + AsyncRpcRetryingBatchCallerFactory(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~AsyncRpcRetryingBatchCallerFactory() = default; + + template + std::shared_ptr> Batch() { + return std::make_shared>(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + + // BatchCallerBuilder + public: + template + class BatchCallerBuilder + : public std::enable_shared_from_this> { + public: + BatchCallerBuilder(std::shared_ptr> async_rpc_batch) + : async_rpc_batch_(async_rpc_batch), + table_name_(nullptr), + actions_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0) { + } + + virtual ~BatchCallerBuilder() = default; + + typedef BatchCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + GenenericThisType& SetTable(std::shared_ptr table_name) { + table_name_ = table_name; + return *this; + } + + GenenericThisType& SetActions(std::shared_ptr>> gets) { + actions_ = gets; + return *this; + } + + GenenericThisType& SetOperationTimeout(long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return *this; + } + + GenenericThisType& SetRpcTimeout(long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return *this; + } + + GenenericThisType& SetPause(int64_t pause_ns) { + pause_ns_ = pause_ns; + return *this; + } + + GenenericThisType& SetMaxAttempts(int32_t max_attempts) { + max_attempts_ = max_attempts; + return *this; + } + + GenenericThisType& StartLogErrorsCnt(int32_t start_log_errors_cnt) { + start_log_errors_cnt_ = start_log_errors_cnt; + return *this; + } + + GenenericThisType& SetLocation(const std::shared_ptr& location_cache) { + location_cache_ = location_cache; + return *this; + } + std::shared_ptr> Build() { + return std::make_shared>( + async_rpc_batch_->retry_timer_, async_rpc_batch_->conn_, table_name_, actions_, callable_, + 1000, 1, operation_timeout_nanos_, rpc_timeout_nanos_, 1, location_cache_); + } + + folly::Future Call() { + AsyncRpcRetryingBatchCaller async_rpc_retrying_batch_caller( + async_rpc_batch_->retry_timer_, async_rpc_batch_->conn_, table_name_, actions_, callable_, + pause_ns_, max_attempts_, operation_timeout_nanos_, rpc_timeout_nanos_, + start_log_errors_cnt_, location_cache_); + return async_rpc_retrying_batch_caller.Call(); + } + + public: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr> async_rpc_batch_; + std::shared_ptr table_name_; + std::shared_ptr location_cache_; + std::shared_ptr>> actions_; + int64_t operation_timeout_nanos_; + int64_t rpc_timeout_nanos_; + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int32_t start_log_errors_cnt_ = 0; + Callable callable_; + }; +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-batch-caller.h b/hbase-native-client/core/async-rpc-retrying-batch-caller.h new file mode 100644 index 0000000..ab40ab9 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-batch-caller.h @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "core/async-rpc-retrying-caller.h" +#include "core/get.h" +#include "core/hbase-rpc-controller.h" +#include "core/hregion-location.h" +#include "core/location-cache.h" +#include "core/multi_action.h" +#include "core/request_converter.h" +#include "core/result.h" +#include "connection/rpc-client.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +#include + +using hbase::Get; +using hbase::LocationCache; +using hbase::MultiAction; +using hbase::RequestConverter; +using hbase::Result; +using hbase::RpcClient; +using hbase::Get; +using hbase::pb::ServerName; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +using ActionsByServerName = std::map, std::shared_ptr>; + +template +class AsyncRpcRetryingBatchCaller { + public: + AsyncRpcRetryingBatchCaller(std::shared_ptr retry_timer, + std::shared_ptr conn, + std::shared_ptr table_name, + std::shared_ptr>> actions, + Callable callable, int64_t pause_ns, + int32_t max_retries, int64_t operation_timeout_nanos, + int64_t rpc_timeout_nanos, int32_t start_log_errors_count, + std::shared_ptr location_cache) + : retry_timer_(retry_timer), + conn_(conn), + table_name_(table_name), + actions_(actions), + location_cache_(location_cache), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()) { + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared>(); + } + + virtual ~AsyncRpcRetryingBatchCaller() = default; + + folly::Future Call() { + GroupAndSend(max_retries_); + return promise_->getFuture(); + } + + private: + long ElapsedMs() { + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns_)).count(); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + int64_t RemainingTimeNs() { return operation_timeout_nanos_ - (GetNowNanos() - start_ns_); } + + void GroupAndSend(int32_t retries) { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + ActionsByServerName actions_by_server_name; + for (const auto action : *actions_) { + auto loc = location_cache_->LocateFromMeta(*table_name_, action->Row()) + .get(milliseconds(locate_timeout_ns)); + if (loc == nullptr) { + continue; + } + + auto region_name = loc->region_name(); + auto server_name = loc->server_name(); + bool found = false; + for (auto itr = actions_by_server_name.begin(); itr != actions_by_server_name.end(); ++itr) { + if (server_name.host_name() == itr->first->host_name()) { + found = true; + // check if a row is already inserted do not push it again in the vector + bool row_present = false; + for (const auto& server_actions : itr->second->Actions()) { + for (const auto& server_action : server_actions.second) { + if (action->Row() == server_action->Row()) { + row_present = true; + break; + } + } + } + if (!row_present) itr->second->Add(region_name, action); + break; + } + } + + if (!found) { + auto multi_action = std::make_shared(); + multi_action->Add(region_name, action); + actions_by_server_name[std::make_shared(server_name)] = multi_action; + } + } + + if (!actions_by_server_name.empty()) { + Send(actions_by_server_name); + } + return; + } + + void Send(const ActionsByServerName& actions_by_server_name) { + int64_t remaining_ns; + if (operation_timeout_nanos_ > 0) { + remaining_ns = RemainingTimeNs(); + if (remaining_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + remaining_ns = std::numeric_limits::max(); + } + + for (auto action : actions_by_server_name) { + auto multi_request = RequestConverter::ToMultiGetRequest(*action.second.get()); + } + return; + } + + private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::shared_ptr>> actions_; + Callable callable_; + std::shared_ptr location_cache_; + int64_t pause_ns_; + int32_t max_retries_; + int64_t operation_timeout_nanos_; + int64_t rpc_timeout_nanos_; + int32_t start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_ = std::shared_ptr(); + int64_t start_ns_; + int32_t tries_ = 1; + std::shared_ptr> exceptions_; + int32_t max_attempts_; +}; +} +/* namespace hbase */ + diff --git a/hbase-native-client/core/multi_action.cc b/hbase-native-client/core/multi_action.cc new file mode 100644 index 0000000..ace3e10 --- /dev/null +++ b/hbase-native-client/core/multi_action.cc @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace hbase { + +MultiAction::MultiAction() { + // TODO Auto-generated constructor stub +} + +MultiAction::~MultiAction() { + // TODO Auto-generated destructor stub +} + +void MultiAction::Add(const std::string& region_name, std::shared_ptr get) { + actions_[region_name].push_back(get); +} + +void MultiAction::Add(const std::string& region_name, + const std::vector >& gets) { + actions_[region_name] = gets; +} + +} /* namespace hbase */ + +int hbase::MultiAction::Size() const { + + int size = 0; + for (const auto& action : actions_) { + size += action.second.size(); + } + return size; +} + +std::vector hbase::MultiAction::Regions() const { + std::vector regions; + /* + std::transform(std::begin(gets_), std::end(gets_), std::back_inserter(regions), + [](auto &get_pair) { + return get_pair.first; + }); + */ + return regions; +} diff --git a/hbase-native-client/core/multi_action.h b/hbase-native-client/core/multi_action.h new file mode 100644 index 0000000..2d211f0 --- /dev/null +++ b/hbase-native-client/core/multi_action.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "core/get.h" + +using hbase::Get; + +namespace hbase { +using MultiActions = std::map>>; +class MultiAction { + private: + MultiActions actions_; + + public: + MultiAction(); + ~MultiAction(); + + void Add(const std::string ®ion_name, std::shared_ptr get); + void Add(const std::string ®ion_name, const std::vector> &gets); + + bool IsEmpty() const { return actions_.empty(); } + int Size() const; + std::vector Regions() const; + const MultiActions &Actions() const { return actions_; } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/request_converter.cc b/hbase-native-client/core/request_converter.cc index eba07df..2417989 100644 --- a/hbase-native-client/core/request_converter.cc +++ b/hbase-native-client/core/request_converter.cc @@ -20,9 +20,10 @@ #include "core/request_converter.h" #include #include "if/Client.pb.h" +#include -using hbase::Request; using hbase::pb::GetRequest; +using hbase::pb::MultiRequest; using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; using hbase::pb::ScanRequest; @@ -40,14 +41,8 @@ void RequestConverter::SetRegion(const std::string ®ion_name, region_specifier->set_value(region_name); } -std::unique_ptr RequestConverter::ToGetRequest(const Get &get, - const std::string ®ion_name) { - auto pb_req = Request::get(); - - auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_get = pb_msg->mutable_get(); +std::unique_ptr RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique(); pb_get->set_max_versions(get.MaxVersions()); pb_get->set_cache_blocks(get.CacheBlocks()); pb_get->set_consistency(get.Consistency()); @@ -67,7 +62,37 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, } } } + return pb_get; +} + +std::unique_ptr RequestConverter::ToGetRequest(const Get &get, + const std::string ®ion_name) { + auto pb_req = Request::get(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + auto pb_get = RequestConverter::ToGet(get); + pb_msg->set_allocated_get(pb_get.release()); + return pb_req; +} +std::unique_ptr RequestConverter::ToMultiGetRequest(const MultiAction &multi_actions) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + auto region_actions = multi_actions.Actions(); + for (const auto ®ion_action : region_actions) { + auto region_specifier = new hbase::pb::RegionSpecifier(); + RequestConverter::SetRegion(region_action.first, region_specifier); + auto pb_region_action = pb_msg->add_regionaction(); + pb_region_action->set_allocated_region(region_specifier); + int action_num = 0; + for (const auto &action : region_action.second) { + auto pb_get = RequestConverter::ToGet(*action); + auto pb_action = pb_region_action->add_action(); + pb_action->set_index(action_num); + pb_action->set_allocated_get(pb_get.release()); + action_num += 1; + } + } return pb_req; } diff --git a/hbase-native-client/core/request_converter.h b/hbase-native-client/core/request_converter.h index 57f08cc..cf28700 100644 --- a/hbase-native-client/core/request_converter.h +++ b/hbase-native-client/core/request_converter.h @@ -23,10 +23,16 @@ #include #include "connection/request.h" #include "core/get.h" +#include "core/multi_action.h" #include "core/scan.h" #include "if/HBase.pb.h" +using hbase::Get; +using hbase::MultiAction; +using hbase::Request; +using hbase::Scan; using hbase::pb::RegionSpecifier; +using hbase::pb::ServerName; namespace hbase { /** @@ -53,6 +59,8 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr ToMultiGetRequest(const MultiAction &multi_actions); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +72,8 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); -}; -} /* namespace hbase */ + static std::unique_ptr ToGet(const Get &get); +}; +} +/* namespace hbase */ -- 1.8.3.1