From 105685c2c78acf8453764780a95eca00e7aa62aa Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Thu, 16 Mar 2017 08:44:28 +1100 Subject: [PATCH] AsyncBatchRpcRetryingCaller class for handling MultiRequests diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc new file mode 100644 index 0000000..96541ca --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace hbase {} /* namespace hbase */ diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h new file mode 100644 index 0000000..efc54c6 --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/action.h" +#include "core/hbase-rpc-controller.h" +#include "core/location-cache.h" +#include "core/multi-response.h" +#include "core/region-location.h" +#include "core/region-request.h" +#include "core/region-result.h" +#include "core/request-converter.h" +#include "core/response-converter.h" +#include "core/result.h" +#include "core/row.h" +#include "core/server-request.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +#include + +using namespace folly; +using folly::Future; +using hbase::Action; +using hbase::LocationCache; +using hbase::RegionLocation; +using hbase::RequestConverter; +using hbase::Result; +using hbase::RpcClient; +using hbase::ServerRequest; +using hbase::pb::ServerName; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +using ActionsByServer = std::map, std::shared_ptr>; +using ActionsByRegion = ServerRequest::ActionsByRegion; +using FutureResult = Future>; +using PromiseResult = Promise>; +using ActionList = RegionRequest::ActionList; + +template +class AsyncBatchRpcRetryingCaller { + public: + AsyncBatchRpcRetryingCaller(std::shared_ptr conn, std::shared_ptr table_name, + std::shared_ptr>> actions, + int64_t pause_ns, int32_t max_attempts, int64_t operation_timeout_ns, + int64_t rpc_timeout_ns, int32_t start_log_errors_count, + const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client) + : conn_(conn), + table_name_(table_name), + pause_ns_(pause_ns), + operation_timeout_ns_(operation_timeout_ns), + rpc_timeout_ns_(rpc_timeout_ns), + start_log_errors_count_(start_log_errors_count), + location_cache_(location_cache), + rpc_client_(rpc_client) { + max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); + uint32_t index = 0; + for (auto row : *actions) { + std::shared_ptr action = std::make_shared(row, index); + actions_->push_back(action); + index += 1; + } + } + + ~AsyncBatchRpcRetryingCaller() { cpu_executor->stop(); } + + std::unique_ptr> Call() { + GroupAndSend(*actions_.get(), 1); + for (auto itr = action2promises_.begin(); itr != action2promises_.end(); ++itr) { + LOG(INFO) << "itr->first:- " << itr->first; + futures_.push_back(std::move(itr->second.getFuture())); + } + return std::make_unique>(std::move(futures_)); + } + + private: + int64_t RemainingTimeNs() { return operation_timeout_ns_ - (GetNowNanos() - start_ns_); } + + void LogException(int32_t tries, std::shared_ptr region_request, + std::shared_ptr &error, + std::shared_ptr server_name) { + if (tries > start_log_errors_count_) { + std::string regions; + regions += region_request->region_location()->region_name() + ", "; + LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" + << table_name_->qualifier() << " from " << server_name->host_name() + << " failed, tries=" << tries << ":- " << error->what(); + } + } + + const std::string GetExtraContextForError(std::shared_ptr server_name) { + return server_name ? "" : server_name->ShortDebugString(); + } + + void AddError(const std::shared_ptr &action, std::shared_ptr error, + std::shared_ptr server_name) { + AddAction2Error(action->original_index(), + std::make_shared( + error, GetNowNanos(), GetExtraContextForError(server_name))); + } + + void AddError(const std::vector> &actions, + std::shared_ptr error, std::shared_ptr server_name) { + for (const auto action : actions) { + AddError(action, error, server_name); + } + } + + void FailOne(const std::shared_ptr &action, int32_t tries, + std::shared_ptr error, int64_t current_time, + const std::string extras) { + int64_t action_index = action->original_index(); + VLOG(8) << "Original index:- " << action_index; + auto itr = action2futures_.find(action_index); + if (itr != action2futures_.end()) { + VLOG(8) << "Found index:- " << action_index; + if (itr->second.isReady()) { + return; + } + } + AddAction2Error(action_index, + std::make_shared(error, current_time, extras)); + // TODO + PromiseResult promise; + promise.setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + action2promises_.insert(std::pair(action->original_index(), std::move(promise))); + } + + void FailAll(const std::vector> &actions, int32_t tries, + std::shared_ptr error, std::shared_ptr server_name) { + for (const auto action : actions) { + FailOne(action, tries, error, GetNowNanos(), GetExtraContextForError(server_name)); + } + } + + void FailAll(const std::vector> &actions, int32_t tries) { + for (const auto action : actions) { + int64_t action_index = action->original_index(); + auto itr = action2futures_.find(action_index); + if (itr->second.isReady()) { + return; + } + // TODO + PromiseResult promise; + promise.setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + action2promises_.insert(std::pair(action->original_index(), std::move(promise))); + } + } + + void CompleteExceptionally() { + // TODO Commented for now + // this->promise_->setException(RetriesExhaustedException(max_attempts_ - 1, exceptions_)); + } + + int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + void AddAction2Error(int64_t action_index, std::shared_ptr twec) { + auto erritr = action2errors_.find(action_index); + if (erritr != action2errors_.end()) { + erritr->second.push_back(twec); + } else { + action2errors_[action_index].push_back(twec); + } + return; + } + + // TODO We have created the method as in Java API, but not using it + void OnError(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr exc, std::shared_ptr server_name) { + std::vector> copied_actions; + std::vector> region_requests; + region_requests.reserve(actions_by_region.size()); + std::for_each( + std::begin(actions_by_region), std::end(actions_by_region), + [®ion_requests, &copied_actions](const ActionsByRegion::value_type &action_by_region) { + region_requests.push_back(action_by_region.second); + for (const auto &action : action_by_region.second->actions()) { + copied_actions.push_back(action); + } + }); + auto pexc = ConnectionUtils::TranslateException(exc); + LogException(tries, region_requests, exc, server_name); + if (auto error = + std::dynamic_pointer_cast(pexc) || tries >= max_attempts_) { + FailAll(copied_actions, tries, pexc, server_name); + } + TryResubmit(copied_actions, tries); + } + + void TryResubmit(std::vector> actions, int32_t tries) { + int64_t delay_ns; + if (operation_timeout_ns_ > 0) { + int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; + if (max_delay_ns <= 0) { + FailAll(actions, tries); + return; + } + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_, tries - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_, tries - 1); + } + VLOG(8) << "Resubmit GroupAndSend()"; + retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); + } + + void GroupAndSend(std::vector> &actions, int32_t tries) { + int64_t locate_timeout_ns; + if (operation_timeout_ns_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + FailAll(*actions_.get(), tries); + return; + } + } else { + locate_timeout_ns = -1L; + } + + auto locs = std::vector>>{}; + for (auto const &action : actions) { + locs.push_back(folly::makeFutureWith([&] { + return location_cache_->LocateRegion(*table_name_, action->action()->row()).get(); + })); + } + + ActionsByServer actions_by_server; + std::vector> locate_failed; + folly::collectAll(locs) + .then([this, tries, &actions, &actions_by_server, + &locate_failed](std::vector>> &completed_locs) { + for (int32_t num = 0; num < completed_locs.size(); ++num) { + const auto &loc = completed_locs[num]; + const auto &action = actions[num]; + if (loc.hasValue()) { + auto region_loc = loc.value(); + // Add it to actions_by_server; + bool found = false; + for (auto itr = actions_by_server.begin(); itr != actions_by_server.end(); ++itr) { + if (region_loc->server_name().host_name() == itr->first->host_name()) { + found = true; + itr->second->AddAction(region_loc, action); + break; + } + } + if (!found) { + // Create new key + auto server_request = std::make_shared(region_loc); + server_request->AddAction(region_loc, action); + auto server_name = std::make_shared(region_loc->server_name()); + actions_by_server[server_name] = server_request; + } + VLOG(3) << "row [" << action->action()->row() << "] of table[" + << table_name_->namespace_() << ":" << table_name_->qualifier() + << " found in region [" << region_loc->region_name() << "]"; + } else if (loc.hasException()) { + VLOG(3) << "Exception occured- " << loc.exception().what().toStdString(); + // TODO Feedback needed + if (loc.exception().is_compatible_with()) { + int64_t current_time = 0; + std::string extra = ""; + FailOne(action, tries, nullptr, time(0), extra); + return; + } else if (loc.exception().is_compatible_with()) { + int64_t current_time = 0; + std::string extra = ""; + FailOne(action, tries, nullptr, time(0), extra); + return; + } + AddError(action, std::make_shared(*loc.exception().getCopied()), + nullptr); + } + } + }) + .then([this, tries, &actions_by_server]() { + // When all regions in actions have are either located or failed we'll Send and + // TryResubmit + if (!actions_by_server.empty()) { + Send(actions_by_server, tries); + } + }) + .then([this, tries, &locate_failed]() { + // When all regions in actions have are either located or failed we'll Send and + // TryResubmit + if (!locate_failed.empty()) { + TryResubmit(locate_failed, tries); + } + }); + return; + } + + void Send(const ActionsByServer &actions_by_server, int32_t tries) { + int64_t remaining_ns; + if (operation_timeout_ns_ > 0) { + remaining_ns = RemainingTimeNs(); + if (remaining_ns <= 0) { + // TODO + std::vector> failed_actions; + std::for_each(std::begin(actions_by_server), std::end(actions_by_server), + [&failed_actions](const ActionsByServer::value_type &action_by_server) { + for (auto &value : action_by_server.second->actions_by_region()) { + for (const auto &failed_action : value.second->actions()) { + failed_actions.push_back(failed_action); + } + } + }); + FailAll(failed_actions, tries); + return; + } + } else { + remaining_ns = std::numeric_limits::max(); + } + + auto user = User::defaultUser(); + std::for_each( + std::begin(actions_by_server), std::end(actions_by_server), + [&](const ActionsByServer::value_type &action_by_server) { + std::for_each( + std::begin(action_by_server.second->actions_by_region()), + std::end(action_by_server.second->actions_by_region()), + [&](const ActionsByRegion::value_type &action_by_region) { + auto loc = action_by_region.second->region_location(); + auto region_name = action_by_region.first; + auto actions_by_region = action_by_region.second->actions(); + + Promise> promise; + Future> future = promise.getFuture(); + + std::shared_ptr multi_req = nullptr; + std::unique_ptr uniq_resp = nullptr; + + folly::via(cpu_executor.get(), [&]() { + return RequestConverter::ToMultiRequest(region_name, actions_by_region) + .then([&](std::unique_ptr req) { + multi_req = std::make_shared(*req); + return rpc_client_->AsyncCall(loc->server_name().host_name(), + loc->server_name().port(), std::move(req), + user, "ClientService"); + }) + .then([&](std::unique_ptr multi_resp) { + promise.setValue( + std::move(ResponseConverter::GetResults(multi_req, *multi_resp.get()))); + }) + .onError([&](std::exception &exc) { + promise.setException(std::runtime_error(exc.what())); + }); + }); + auto multi_results = future.get(); + OnComplete(action_by_server.second->actions_by_region(), tries, + action_by_server.first, std::move(multi_results)); + }); + }); + return; + } + + void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, + const std::shared_ptr server_name, + const std::unique_ptr multi_results) { + std::vector> failed_actions; + for (const auto &action_by_region : actions_by_region) { + auto region_result_itr = multi_results->Results().find(action_by_region.first); + if (region_result_itr == multi_results->Results().end()) { + LOG(ERROR) << "Region " << action_by_region.first << " not found in MultiResults."; + // TODO Feedback needed Shoud we throw from here or continue for next action_by_region + // Throwing at present as this looks like an inconsistency + std::runtime_error("Invalid search for region " + action_by_region.first + + " in multi results"); + } + if (region_result_itr != multi_results->Results().end()) { + for (const auto &action : action_by_region.second->actions()) { + OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second, + failed_actions); + VLOG(8) << "OnComplete done for region:- " << action_by_region.first; + } + } else { + std::shared_ptr region_exc = + multi_results->Exception(action_by_region.first); + std::shared_ptr pexc; + if (region_exc == nullptr) { + LOG(ERROR) << "Server sent us neither results nor exceptions for " + << action_by_region.first; + pexc = std::make_shared(std::runtime_error("Invalid response")); + } else { + // TODO // Translate exception defn + pexc = ConnectionUtils::TranslateException(region_exc); + LogException(tries, action_by_region.second, pexc, server_name); + // TODO Move it to UpdateLocation(table_name, error) function + std::string row_name; + location_cache_->ClearCachedLocation(*table_name_, row_name); + if (auto error = std::dynamic_pointer_cast(pexc) || + tries >= max_attempts_) { + FailAll(action_by_region.second->actions(), tries, pexc, server_name); + return; + } + AddError(action_by_region.second->actions(), pexc, server_name); + for (const auto &action : action_by_region.second->actions()) { + failed_actions.push_back(action); + } + } + } + } + if (!failed_actions.empty()) { + TryResubmit(failed_actions, tries); + } + return; + } + + void OnComplete(const std::shared_ptr &action, + const std::shared_ptr ®ion_request, int32_t tries, + const std::shared_ptr &server_name, + const std::shared_ptr ®ion_result, + std::vector> &failed_actions) { + std::string err_msg; + try { + auto result_or_exc = region_result->result(action->original_index()); + auto result = std::get<0>(*result_or_exc.get()); + auto exc = std::get<1>(*result_or_exc.get()); + std::shared_ptr pexc; + if (exc != nullptr) { + pexc = ConnectionUtils::TranslateException(exc); + VLOG(6) << "Stored exception" << pexc->what(); + LogException(tries, region_request, pexc, server_name); + if (auto error = + std::dynamic_pointer_cast(pexc) || tries >= max_attempts_) { + VLOG(8) << "FailOne; tries:- " << tries << "; max_attempts_:- " << max_attempts_ + << "; pexc:- " << pexc->what(); + FailOne(action, tries, pexc, GetNowNanos(), GetExtraContextForError(server_name)); + } else { + failed_actions.push_back(action); + } + } else if (result != nullptr) { + VLOG(8) << "action->original_index():- " << action->original_index(); + PromiseResult promise; + promise.setValue(std::move(result)); + action2promises_.insert( + std::pair(action->original_index(), std::move(promise))); + } else { + LOG(ERROR) << "Server " << server_name->ShortDebugString() + << " sent us neither results nor exceptions for request @ index " + << action->original_index() << ", row " << action->action()->row() << " of " + << region_request->region_location()->region_name(); + err_msg = "Invalid response"; + AddError(action, std::make_shared(err_msg), server_name); + failed_actions.push_back(action); + } + } catch (const std::out_of_range &oor) { + LOG(ERROR) << "No ResultOrException found @ index " << action->original_index() << ", row " + << action->action()->row() << " of " + << region_request->region_location()->region_name(); + throw std::runtime_error("ResultOrException not present @ index " + + std::to_string(action->original_index())); + } + return; + } + + private: + folly::EventBase event_base_; + folly::HHWheelTimer::UniquePtr retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::shared_ptr>> actions_ = + std::make_shared>>(); + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int64_t operation_timeout_ns_ = 0; + int64_t rpc_timeout_ns_ = 0; + int32_t start_log_errors_count_ = 0; + + int64_t start_ns_ = TimeUtil::GetNowNanos(); + int32_t tries_ = 1; + std::vector futures_; + std::map action2futures_; + std::map action2promises_; + std::map>> action2errors_; + std::shared_ptr> exceptions_ = + std::make_shared>(); + + std::shared_ptr location_cache_ = nullptr; + std::shared_ptr rpc_client_ = nullptr; + // TODO we need to have this as a constructor parameter + std::shared_ptr cpu_executor = + std::make_shared(4); +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h index 3342e29..76a1264 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -26,7 +26,10 @@ #include #include "connection/rpc-client.h" +#include "core/async-batch-rpc-retrying-caller.h" #include "core/async-rpc-retrying-caller.h" +#include "core/location-cache.h" +#include "core/row.h" #include "if/Client.pb.h" #include "if/HBase.pb.h" @@ -103,7 +106,95 @@ class SingleRequestCallerBuilder std::string row_; RegionLocateType locate_type_; Callable callable_; -}; // end of SingleRequestCallerBuilder +}; +// end of SingleRequestCallerBuilder + +template +class BatchCallerBuilder : public std::enable_shared_from_this> { + public: + explicit BatchCallerBuilder(std::shared_ptr conn) : conn_(conn) {} + + virtual ~BatchCallerBuilder() = default; + + typedef BatchCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr set_table(std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr set_actions(std::shared_ptr>> actions) { + actions_ = actions; + return shared_this(); + } + + SharedThisPtr set_operation_timeput(long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr set_rpc_timeout(long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr set_pause(int64_t pause_ns) { + pause_ns_ = pause_ns; + return shared_this(); + } + + SharedThisPtr set_max_attempts(int32_t max_attempts) { + max_attempts_ = max_attempts; + return shared_this(); + } + + SharedThisPtr start_log_errors_cnt(int32_t start_log_errors_cnt) { + start_log_errors_cnt_ = start_log_errors_cnt; + return shared_this(); + } + + SharedThisPtr set_location_cache(std::shared_ptr location_cache) { + location_cache_ = location_cache; + return shared_this(); + } + + SharedThisPtr set_rpc_client(std::shared_ptr rpc_client) { + rpc_client_ = rpc_client; + return shared_this(); + } + + SharedThisPtr set_cpu_pool(std::shared_ptr cpu_pool) { + cpu_pool_ = cpu_pool; + return shared_this(); + } + + std::unique_ptr> Call() { return Build()->Call(); } + + std::shared_ptr> Build() { + return std::make_shared>( + conn_, table_name_, actions_, pause_ns_, max_attempts_, operation_timeout_nanos_, + rpc_timeout_nanos_, start_log_errors_cnt_, location_cache_, rpc_client_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr table_name_ = nullptr; + std::shared_ptr>> actions_ = nullptr; + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int64_t operation_timeout_nanos_ = 0; + int64_t rpc_timeout_nanos_ = 0; + int32_t start_log_errors_cnt_ = 0; + std::shared_ptr location_cache_ = nullptr; + std::shared_ptr rpc_client_ = nullptr; + std::shared_ptr cpu_pool_ = nullptr; +}; template class AsyncRpcRetryingCallerFactory { @@ -119,6 +210,10 @@ class AsyncRpcRetryingCallerFactory { std::shared_ptr> Single() { return std::make_shared>(conn_); } + + std::shared_ptr> Batch() { + return std::make_shared>(conn_); + } }; } // namespace hbase -- 1.8.3.1