diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index 412ee3b..e9fc716 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -53,6 +53,7 @@ cxx_library( "region-result.h", "row.h", "server-request.h", + "async-batch-rpc-retrying-caller.h", ], srcs=[ "async-connection.cc", @@ -77,6 +78,7 @@ cxx_library( "zk-util.cc", "multi-response.cc", "region-result.cc", + "async-batch-rpc-retrying-caller.cc", ], deps=[ "//exceptions:exceptions", diff --git hbase-native-client/core/action.h hbase-native-client/core/action.h index 3511683..21a0181 100644 --- hbase-native-client/core/action.h +++ hbase-native-client/core/action.h @@ -20,24 +20,23 @@ #pragma once #include -#include "core/row.h" +#include "core/get.h" -using hbase::Row; namespace hbase { class Action { public: - Action(std::shared_ptr action, int original_index) + Action(std::shared_ptr action, int32_t original_index) : action_(action), original_index_(original_index) {} ~Action() {} - int64_t original_index() const { return original_index_; } + int32_t original_index() const { return original_index_; } - std::shared_ptr action() const { return action_; } + std::shared_ptr action() const { return action_; } private: - std::shared_ptr action_; - int64_t original_index_; + std::shared_ptr action_; + int32_t original_index_; int64_t nonce_ = -1; int32_t replica_id_ = -1; }; diff --git hbase-native-client/core/async-batch-rpc-retrying-caller.cc hbase-native-client/core/async-batch-rpc-retrying-caller.cc new file mode 100644 index 0000000..f3be637 --- /dev/null +++ hbase-native-client/core/async-batch-rpc-retrying-caller.cc @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-batch-rpc-retrying-caller.h" +#include +#include + +using folly::Future; +using folly::Promise; +using folly::Try; + +using folly::Future; +using folly::Promise; +using folly::Try; +using hbase::Action; +using hbase::LocationCache; +using hbase::MultiResponse; +using hbase::RegionLocation; +using hbase::RegionRequest; +using hbase::RequestConverter; +using hbase::Result; +using hbase::RpcClient; +using hbase::ServerRequest; +using hbase::pb::ServerName; +using hbase::pb::TableName; +using hbase::security::User; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; +using wangle::CPUThreadPoolExecutor; + +namespace hbase { + +AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller( + std::shared_ptr conn, std::shared_ptr retry_timer, + std::shared_ptr table_name, const std::vector &actions, + nanoseconds pause_ns, int32_t max_attempts, nanoseconds operation_timeout_ns, + nanoseconds rpc_timeout_ns, int32_t start_log_errors_count) + : conn_(conn), + retry_timer_(retry_timer), + table_name_(table_name), + pause_ns_(pause_ns), + operation_timeout_ns_(operation_timeout_ns), + rpc_timeout_ns_(rpc_timeout_ns), + start_log_errors_count_(start_log_errors_count) { + CHECK(conn_ != nullptr); + CHECK(retry_timer_ != nullptr); + location_cache_ = conn_->region_locator(); + rpc_client_ = conn_->rpc_client(); + cpu_pool_ = conn_->cpu_executor(); + CHECK(location_cache_ != nullptr); + CHECK(rpc_client_ != nullptr); + CHECK(cpu_pool_ != nullptr); + + max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); + uint32_t index = 0; + for (auto row : actions) { + actions_.push_back(std::make_shared(std::make_shared(row), index)); + Promise> prom{}; + action2promises_.insert( + std::pair>>(index, std::move(prom))); + action2futures_.push_back(action2promises_[index++].getFuture()); + } +} + +AsyncBatchRpcRetryingCaller::~AsyncBatchRpcRetryingCaller() {} + +Future>>> AsyncBatchRpcRetryingCaller::Call() { + GroupAndSend(actions_, 1); + return collectAll(action2futures_); +} + +int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() { + return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_); +} + +void AsyncBatchRpcRetryingCaller::LogException(int32_t tries, + std::shared_ptr region_request, + std::shared_ptr &error, + std::shared_ptr server_name) { + if (tries > start_log_errors_count_) { + std::string regions; + regions += region_request->region_location()->region_name() + ", "; + LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" + << table_name_->qualifier() << " from " << server_name->host_name() + << " failed, tries=" << tries << ":- " << error->what(); + } +} + +void AsyncBatchRpcRetryingCaller::LogException( + int32_t tries, std::vector> ®ion_requests, + std::shared_ptr &error, std::shared_ptr server_name) { + if (tries > start_log_errors_count_) { + std::string regions; + for (const auto region_request : region_requests) { + regions += region_request->region_location()->region_name() + ", "; + } + LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" + << table_name_->qualifier() << " from " << server_name->host_name() + << " failed, tries=" << tries << error->what(); + } +} + +const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError( + std::shared_ptr server_name) { + return server_name ? server_name->ShortDebugString() : ""; +} + +// TODO HBASE-17800 pass folly ew instead of std::exception +void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr &action, + std::shared_ptr error, + std::shared_ptr server_name) { + folly::exception_wrapper ew; + ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); + AddAction2Error(action->original_index(), twec); +} + +void AsyncBatchRpcRetryingCaller::AddError(const std::vector> &actions, + std::shared_ptr error, + std::shared_ptr server_name) { + for (const auto action : actions) { + AddError(action, error, server_name); + } +} + +// TODO HBASE-17800 pass folly ew instead of std::exception +void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr &action, int32_t tries, + std::shared_ptr error, + int64_t current_time, const std::string extras) { + auto action_index = action->original_index(); + auto itr = action2promises_.find(action_index); + if (itr != action2promises_.end()) { + if (itr->second.isFulfilled()) { + return; + } + } + folly::exception_wrapper ew; + ThrowableWithExtraContext twec(ew, current_time, extras); + AddAction2Error(action_index, twec); + action2promises_[action_index].setException( + RetriesExhaustedException(tries - 1, action2errors_[action_index])); +} + +void AsyncBatchRpcRetryingCaller::FailAll(const std::vector> &actions, + int32_t tries, std::shared_ptr error, + std::shared_ptr server_name) { + for (const auto action : actions) { + FailOne(action, tries, error, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); + } +} + +void AsyncBatchRpcRetryingCaller::FailAll(const std::vector> &actions, + int32_t tries) { + for (const auto action : actions) { + auto action_index = action->original_index(); + auto itr = action2promises_.find(action_index); + if (itr->second.isFulfilled()) { + return; + } + action2promises_[action_index].setException( + RetriesExhaustedException(tries - 1, action2errors_[action_index])); + } +} + +void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index, + const ThrowableWithExtraContext &twec) { + auto erritr = action2errors_.find(action_index); + if (erritr != action2errors_.end()) { + erritr->second->push_back(twec); + } else { + action2errors_[action_index] = std::make_shared>(); + action2errors_[action_index]->push_back(twec); + } + return; +} + +void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr exc, + std::shared_ptr server_name) { + std::vector> copied_actions; + std::vector> region_requests; + for (const auto &action_by_region : actions_by_region) { + region_requests.push_back(action_by_region.second); + // Concurrent + for (const auto &action : action_by_region.second->actions()) { + copied_actions.push_back(action); + } + } + // TODO HBASE-17800 for exc check with DoNotRetryIOException + LogException(tries, region_requests, exc, server_name); + if (tries >= max_attempts_) { + FailAll(copied_actions, tries, exc, server_name); + return; + } + AddError(copied_actions, exc, server_name); + TryResubmit(copied_actions, tries); +} + +void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector> actions, + int32_t tries) { + int64_t delay_ns; + if (operation_timeout_ns_.count() > 0) { + int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; + if (max_delay_ns <= 0) { + VLOG(8) << "Fail All from onError"; + FailAll(actions, tries); + return; + } + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1); + } + // TODO This gives segfault @ present, when retried + // retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); }, + // milliseconds(TimeUtil::ToMillis(delay_ns))); +} + +Future>>> +AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector> &actions, + int64_t locate_timeout_ns) { + auto locs = std::vector>>{}; + for (auto const &action : actions) { + locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(), + RegionLocateType::kCurrent, locate_timeout_ns)); + } + + return collectAll(locs); +} + +void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector> &actions, + int32_t tries) { + int64_t locate_timeout_ns; + if (operation_timeout_ns_.count() > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + FailAll(actions_, tries); + return; + } + } else { + locate_timeout_ns = -1L; + } + + GetRegionLocations(actions, locate_timeout_ns) + .then([&](std::vector>> &loc) { + std::lock_guard lock(multi_mutex_); + ActionsByServer actions_by_server; + std::vector> locate_failed; + + for (uint64_t i = 0; i < loc.size(); ++i) { + auto action = actions[i]; + if (loc[i].hasValue()) { + auto region_loc = loc[i].value(); + // Add it to actions_by_server; + // Concurrent + auto search = + actions_by_server.find(std::make_shared(region_loc->server_name())); + if (search != actions_by_server.end()) { + search->second->AddActionsByRegion(region_loc, action); + } else { + // Create new key + auto server_request = std::make_shared(region_loc); + server_request->AddActionsByRegion(region_loc, action); + auto server_name = std::make_shared(region_loc->server_name()); + actions_by_server[server_name] = server_request; + } + locate_failed.push_back(action); + VLOG(8) << "row [" << action->action()->row() << "] of table[" + << table_name_->namespace_() << ":" << table_name_->qualifier() + << " found in region [" << region_loc->region_name() << "]; host[" + << region_loc->server_name().host_name() << "]; port[" + << region_loc->server_name().port() << "];"; + } else if (loc[i].hasException()) { + VLOG(8) << "Exception occured while locating region:- " + << loc[i].exception().getCopied()->what() << " for action index " << i; + // TODO Feedback needed, Java API only identifies DoNotRetryIOException + // We might receive runtime error from location-cache.cc too, we are treating both same + if (loc[i].exception().is_compatible_with()) { + std::string extra = ""; + FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(), + loc[i].exception().what().toStdString()); + return; + } + // TODO HBASE-17800 for exc check with DoNotRetryIOException + /* + else if (loc[i].exception().is_compatible_with()) { + int64_t current_time = 0; + std::string extra = ""; + FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(), + loc[i].exception().what().toStdString()); + return; + }*/ + AddError(action, std::make_shared(*loc[i].exception().getCopied()), + nullptr); + locate_failed.push_back(action); + } + } + + if (!actions_by_server.empty()) { + Send(actions_by_server, tries); + } + + if (!locate_failed.empty()) { + TryResubmit(locate_failed, tries); + } + }) + .onError([&](const folly::exception_wrapper &ew) { + std::lock_guard lock(multi_mutex_); + auto exc = ew.getCopied(); + VLOG(8) << "GetRegionLocations() exception: " << ew.what().toStdString(); + }); + return; +} + +Future>>> AsyncBatchRpcRetryingCaller::GetMultiResponse( + const ActionsByServer &actions_by_server) { + // Concurrent. + auto multi_calls = std::vector>>{}; + auto user = User::defaultUser(); + for (const auto &action_by_server : actions_by_server) { + std::unique_ptr multi_req = + RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region()); + auto host = action_by_server.first->host_name(); + int port = action_by_server.first->port(); + multi_calls.push_back( + rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService")); + } + return collectAll(multi_calls); +} + +void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32_t tries) { + int64_t remaining_ns; + if (operation_timeout_ns_.count() > 0) { + remaining_ns = RemainingTimeNs(); + if (remaining_ns <= 0) { + std::vector> failed_actions; + for (const auto &action_by_server : actions_by_server) { + // Concurrent + for (auto &value : action_by_server.second->actions_by_region()) { + // Concurrent + for (const auto &failed_action : value.second->actions()) { + failed_actions.push_back(failed_action); + } + } + } + FailAll(failed_actions, tries); + return; + } + } else { + remaining_ns = std::numeric_limits::max(); + } + + std::vector> multi_reqv; + for (const auto &action_by_server : actions_by_server) + multi_reqv.push_back( + std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region()))); + + GetMultiResponse(actions_by_server) + .then([=](const std::vector>> &completed_responses) { + std::lock_guard lock(multi_mutex_); + for (uint64_t num = 0; num < completed_responses.size(); ++num) { + if (completed_responses[num].hasValue()) { + auto multi_response = + ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value()); + for (const auto &action_by_server : actions_by_server) { + OnComplete(action_by_server.second->actions_by_region(), tries, + action_by_server.first, std::move(multi_response)); + } + } else if (completed_responses[num].hasException()) { + VLOG(8) << "Received exception: " + << completed_responses[num].exception().getCopied()->what() + << " from server for action index " << num; + // TODO: we should call OnError here as well. + } + } + }) + .onError([=](const folly::exception_wrapper &ew) { + auto exc = ew.getCopied(); + VLOG(8) << "GetMultiResponse() exception: " << ew.what().toStdString(); + std::lock_guard lock(multi_mutex_); + for (const auto &action_by_server : actions_by_server) { + OnError(action_by_server.second->actions_by_region(), tries, + std::make_shared(*exc), action_by_server.first); + } + }); + return; +} + +void AsyncBatchRpcRetryingCaller::OnComplete( + const ActionsByRegion &actions_by_region, int32_t tries, + const std::shared_ptr server_name, + const std::unique_ptr multi_response) { + std::vector> failed_actions; + for (const auto &action_by_region : actions_by_region) { + auto region_result_itr = multi_response->RegionResults().find(action_by_region.first); + if (region_result_itr == multi_response->RegionResults().end()) { + VLOG(8) << "Region " << action_by_region.first << " not found in MultiResults."; + // TODO Feedback needed Should we throw from here or continue for next action_by_region ? + // Throwing at present as this looks like an inconsistency + // Concurrent + auto exc = std::make_shared("Invalid search for region " + + action_by_region.first + " in multi results"); + FailAll(action_by_region.second->actions(), tries, exc, server_name); + return; + // std::runtime_error( + // "Invalid search for region " + action_by_region.first + " in multi results"); + } + if (region_result_itr != multi_response->RegionResults().end()) { + // Concurrent + for (const auto &action : action_by_region.second->actions()) { + OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second, + failed_actions); + } + } else { + auto region_exc = multi_response->RegionException(action_by_region.first); + std::shared_ptr pexc; + if (region_exc == nullptr) { + VLOG(8) << "Server sent us neither results nor exceptions for " << action_by_region.first; + pexc = std::make_shared(std::runtime_error("Invalid response")); + // TODO: raise this exception to the application + } else { + // TODO HBASE-17800 for exc check with DoNotRetryIOException + LogException(tries, action_by_region.second, region_exc, server_name); + location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(), + *region_exc); + std::string row_name; + if (tries >= max_attempts_) { + // Concurrent + FailAll(action_by_region.second->actions(), tries, region_exc, server_name); + return; + } + // Concurrent + AddError(action_by_region.second->actions(), region_exc, server_name); + for (const auto &action : action_by_region.second->actions()) { + failed_actions.push_back(action); + } + } + } + } + if (!failed_actions.empty()) { + TryResubmit(failed_actions, tries); + } + return; +} + +void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr &action, + const std::shared_ptr ®ion_request, + int32_t tries, + const std::shared_ptr &server_name, + const std::shared_ptr ®ion_result, + std::vector> &failed_actions) { + std::string err_msg; + try { + auto result_or_exc = region_result->ResultOrException(action->original_index()); + auto result = std::get<0>(*result_or_exc); + auto exc = std::get<1>(*result_or_exc); + std::shared_ptr pexc; + if (exc != nullptr) { + LogException(tries, region_request, exc, server_name); + if (tries >= max_attempts_) { + FailOne(action, tries, exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); + } else { + failed_actions.push_back(action); + } + } else if (result != nullptr) { + action2promises_[action->original_index()].setValue(std::move(result)); + } else { + VLOG(8) << "Server " << server_name->ShortDebugString() + << " sent us neither results nor exceptions for request @ index " + << action->original_index() << ", row " << action->action()->row() << " of " + << region_request->region_location()->region_name(); + err_msg = "Invalid response"; + AddError(action, std::make_shared(err_msg), server_name); + failed_actions.push_back(action); + } + } catch (const std::out_of_range &oor) { + // TODO Feedback needed. Should we retry for he specific index again ? + // This should never occur, so we are throwing a std::runtime_error from here + VLOG(8) << "No ResultOrException found @ index " << action->original_index() << ", row " + << action->action()->row() << " of " + << region_request->region_location()->region_name(); + throw std::runtime_error("ResultOrException not present @ index " + + std::to_string(action->original_index())); + } + return; +} +} /* namespace hbase */ diff --git hbase-native-client/core/async-batch-rpc-retrying-caller.h hbase-native-client/core/async-batch-rpc-retrying-caller.h new file mode 100644 index 0000000..6803a0e --- /dev/null +++ hbase-native-client/core/async-batch-rpc-retrying-caller.h @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/action.h" +#include "core/async-connection.h" +#include "core/location-cache.h" +#include "core/multi-response.h" +#include "core/region-location.h" +#include "core/region-request.h" +#include "core/region-result.h" +#include "core/request-converter.h" +#include "core/response-converter.h" +#include "core/result.h" +#include "core/row.h" +#include "core/server-request.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "security/user.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +namespace hbase { +/* Equals function for ServerName */ +struct ServerNameEquals { + bool operator()(const std::shared_ptr &lhs, + const std::shared_ptr &rhs) const { + return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() && + lhs->port() == rhs->port()); + } +}; + +struct ServerNameHash { + /** hash */ + std::size_t operator()(const std::shared_ptr &sn) const { + std::size_t h = 0; + boost::hash_combine(h, sn->start_code()); + boost::hash_combine(h, sn->host_name()); + boost::hash_combine(h, sn->port()); + return h; + } +}; + +class AsyncBatchRpcRetryingCaller { + public: + using ActionsByServer = + std::unordered_map, std::shared_ptr, + ServerNameHash, ServerNameEquals>; + using ActionsByRegion = ServerRequest::ActionsByRegion; + + AsyncBatchRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr retry_timer, + std::shared_ptr table_name, + const std::vector &actions, nanoseconds pause_ns, + int32_t max_attempts, nanoseconds operation_timeout_ns, + nanoseconds rpc_timeout_ns, int32_t start_log_errors_count); + + ~AsyncBatchRpcRetryingCaller(); + + folly::Future>>> Call(); + + private: + int64_t RemainingTimeNs(); + + void LogException(int32_t tries, std::shared_ptr region_request, + std::shared_ptr &error, + std::shared_ptr server_name); + + void LogException(int32_t tries, std::vector> ®ion_requests, + std::shared_ptr &error, + std::shared_ptr server_name); + + const std::string GetExtraContextForError(std::shared_ptr server_name); + + void AddError(const std::shared_ptr &action, std::shared_ptr error, + std::shared_ptr server_name); + + void AddError(const std::vector> &actions, + std::shared_ptr error, std::shared_ptr server_name); + + void FailOne(const std::shared_ptr &action, int32_t tries, + std::shared_ptr error, int64_t current_time, + const std::string extras); + + void FailAll(const std::vector> &actions, int32_t tries, + std::shared_ptr error, std::shared_ptr server_name); + + void FailAll(const std::vector> &actions, int32_t tries); + + void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec); + + void OnError(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr exc, std::shared_ptr server_name); + + void TryResubmit(std::vector> actions, int32_t tries); + + folly::Future>>> GetRegionLocations( + const std::vector> &actions, int64_t locate_timeout_ns); + + void GroupAndSend(const std::vector> &actions, int32_t tries); + + folly::Future>>> GetMultiResponse( + const ActionsByServer &actions_by_server); + + void Send(ActionsByServer &actions_by_server, int32_t tries); + + void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, + const std::shared_ptr server_name, + const std::unique_ptr multi_results); + + void OnComplete(const std::shared_ptr &action, + const std::shared_ptr ®ion_request, int32_t tries, + const std::shared_ptr &server_name, + const std::shared_ptr ®ion_result, + std::vector> &failed_actions); + + private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::vector> actions_; + nanoseconds pause_ns_; + int32_t max_attempts_ = 0; + nanoseconds operation_timeout_ns_; + nanoseconds rpc_timeout_ns_; + int32_t start_log_errors_count_ = 0; + + int64_t start_ns_ = TimeUtil::GetNowNanos(); + int32_t tries_ = 1; + std::map>> action2promises_; + std::vector>> action2futures_; + std::map>> action2errors_; + + std::shared_ptr location_cache_ = nullptr; + std::shared_ptr rpc_client_ = nullptr; + std::shared_ptr cpu_pool_ = nullptr; + + std::mutex multi_mutex_; +}; + +} /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h index 5a80a06..f1ffdac 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -19,20 +19,19 @@ #pragma once #include -#include #include #include #include #include +#include #include "connection/rpc-client.h" +#include "core/async-batch-rpc-retrying-caller.h" #include "core/async-rpc-retrying-caller.h" +#include "core/row.h" #include "if/Client.pb.h" #include "if/HBase.pb.h" -using hbase::pb::TableName; -using std::chrono::nanoseconds; - namespace hbase { class AsyncConnection; @@ -58,7 +57,7 @@ class SingleRequestCallerBuilder typedef SingleRequestCallerBuilder GenenericThisType; typedef std::shared_ptr SharedThisPtr; - SharedThisPtr table(std::shared_ptr table_name) { + SharedThisPtr table(std::shared_ptr table_name) { table_name_ = table_name; return shared_this(); } @@ -119,7 +118,7 @@ class SingleRequestCallerBuilder private: std::shared_ptr conn_; std::shared_ptr retry_timer_; - std::shared_ptr table_name_; + std::shared_ptr table_name_; nanoseconds rpc_timeout_nanos_; nanoseconds operation_timeout_nanos_; nanoseconds pause_; @@ -130,6 +129,75 @@ class SingleRequestCallerBuilder Callable callable_; }; // end of SingleRequestCallerBuilder +class BatchCallerBuilder : public std::enable_shared_from_this { + public: + explicit BatchCallerBuilder(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~BatchCallerBuilder() = default; + + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr table(std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr actions(std::shared_ptr> actions) { + actions_ = actions; + return shared_this(); + } + + SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr pause(nanoseconds pause_ns) { + pause_ns_ = pause_ns; + return shared_this(); + } + + SharedThisPtr max_attempts(int32_t max_attempts) { + max_attempts_ = max_attempts; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + + folly::Future>>> Call() { return Build()->Call(); } + + std::shared_ptr Build() { + return std::make_shared( + conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_, + operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + std::shared_ptr table_name_ = nullptr; + std::shared_ptr> actions_ = nullptr; + nanoseconds pause_ns_; + int32_t max_attempts_ = 0; + nanoseconds operation_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + int32_t start_log_errors_count_ = 0; +}; class AsyncRpcRetryingCallerFactory { private: std::shared_ptr conn_; @@ -146,6 +214,10 @@ class AsyncRpcRetryingCallerFactory { std::shared_ptr> Single() { return std::make_shared>(conn_, retry_timer_); } + + std::shared_ptr Batch() { + return std::make_shared(conn_, retry_timer_); + } }; } // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller.cc hbase-native-client/core/async-rpc-retrying-caller.cc index 7e211f7..f8b237b 100644 --- hbase-native-client/core/async-rpc-retrying-caller.cc +++ hbase-native-client/core/async-rpc-retrying-caller.cc @@ -143,9 +143,7 @@ void AsyncSingleRequestRpcRetryingCaller::OnError( */ conn_->retry_executor()->add([&]() { retry_timer_->scheduleTimeoutFn( - [this]() { - conn_->cpu_executor()->add([&]() { LocateThenCall(); }); - }, + [this]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); }, milliseconds(TimeUtil::ToMillis(delay_ns))); }); } diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index ff28e79..487c34c 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -159,9 +159,9 @@ class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { uint32_t num_fails_ = 0; public: - explicit MockFailingAsyncRegionLocator(uint32_t num_fails) + explicit MockFailingAsyncRegionLocator(uint32_t num_fails) : AsyncRegionLocatorBase(), num_fails_(num_fails) {} - explicit MockFailingAsyncRegionLocator(std::shared_ptr region_location) + explicit MockFailingAsyncRegionLocator(std::shared_ptr region_location) : AsyncRegionLocatorBase(region_location) {} virtual ~MockFailingAsyncRegionLocator() {} folly::Future> LocateRegion( diff --git hbase-native-client/core/client-test.cc hbase-native-client/core/client-test.cc index 274168f..1c6ec4a 100644 --- hbase-native-client/core/client-test.cc +++ hbase-native-client/core/client-test.cc @@ -234,3 +234,56 @@ TEST_F(ClientTest, PutsWithTimestamp) { table->Close(); client.Close(); } + +TEST_F(ClientTest, MultiGets) { + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable("t", "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + uint64_t num_rows = 10000; + // Perform Puts + for (uint64_t i = 0; i < num_rows; i++) { + table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), + "value" + std::to_string(i))); + } + + // Perform the Gets + std::vector gets; + for (uint64_t i = 0; i < num_rows; ++i) { + auto row = "test" + std::to_string(i); + hbase::Get get(row); + gets.push_back(get); + } + gets.push_back(hbase::Get("test2")); + gets.push_back(hbase::Get("testextra")); + + auto results = table->Get(gets); + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!results.empty()) << "Result vector shouldn't be empty."; + + uint32_t i = 0; + for (; i < num_rows; ++i) { + ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() + << " must not be empty"; + EXPECT_EQ("test" + std::to_string(i), results[i]->Row()); + EXPECT_EQ("value" + std::to_string(i), *results[i]->Value("d", std::to_string(i)).get()); + } + // We are inserting test2 twice so the below test should pass + ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must not be empty"; + + ++i; + ASSERT_TRUE(results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must be empty"; + + table->Close(); + client.Close(); +} diff --git hbase-native-client/core/raw-async-table.cc hbase-native-client/core/raw-async-table.cc index 2bc9f36..9e0d4a3 100644 --- hbase-native-client/core/raw-async-table.cc +++ hbase-native-client/core/raw-async-table.cc @@ -91,4 +91,24 @@ Future RawAsyncTable::Put(const hbase::Put& put) { return caller->Call().then([caller](const auto r) { return r; }); } -} /* namespace hbase */ +Future>>> RawAsyncTable::Get( + const std::vector& gets) { + return this->Batch(gets); +} + +Future>>> RawAsyncTable::Batch( + const std::vector& gets) { + auto caller = connection_->caller_factory() + ->Batch() + ->table(table_name_) + ->actions(std::make_shared>(gets)) + ->rpc_timeout(connection_conf_->read_rpc_timeout()) + ->operation_timeout(connection_conf_->operation_timeout()) + ->pause(connection_conf_->pause()) + ->max_attempts(connection_conf_->max_retries()) + ->start_log_errors_count(connection_conf_->start_log_errors_count()) + ->Build(); + + return caller->Call().then([caller](auto r) { return r; }); +} +} // namespace hbase diff --git hbase-native-client/core/raw-async-table.h hbase-native-client/core/raw-async-table.h index 978a2b8..e26d46e 100644 --- hbase-native-client/core/raw-async-table.h +++ hbase-native-client/core/raw-async-table.h @@ -20,11 +20,11 @@ #include #include - #include #include #include - +#include +#include "core/async-batch-rpc-retrying-caller.h" #include "core/async-connection.h" #include "core/async-rpc-retrying-caller-factory.h" #include "core/async-rpc-retrying-caller.h" @@ -34,6 +34,7 @@ #include "core/result.h" using folly::Future; +using folly::Try; using folly::Unit; using hbase::pb::TableName; using std::chrono::nanoseconds; @@ -59,6 +60,9 @@ class RawAsyncTable { Future Put(const hbase::Put& put); void Close() {} + Future>>> Get(const std::vector& gets); + Future>>> Batch(const std::vector& gets); + private: /* Data */ std::shared_ptr connection_; @@ -78,5 +82,4 @@ class RawAsyncTable { std::shared_ptr> CreateCallerBuilder(std::string row, nanoseconds rpc_timeout); }; - } // namespace hbase diff --git hbase-native-client/core/request-converter.cc hbase-native-client/core/request-converter.cc index 4c12ee7..c90e1ab 100644 --- hbase-native-client/core/request-converter.cc +++ hbase-native-client/core/request-converter.cc @@ -50,7 +50,6 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release()); - return pb_req; } @@ -114,12 +113,12 @@ std::unique_ptr RequestConverter::ToMultiRequest( int action_num = 0; for (const auto ®ion_action : action_by_region.second->actions()) { auto pb_action = pb_region_action->add_action(); - auto action = region_action->action(); - if (auto pget = std::dynamic_pointer_cast(action)) { - auto pb_get = RequestConverter::ToGet(*pget.get()); - pb_action->set_allocated_get(pb_get.release()); - pb_action->set_index(action_num); - } + auto pget = region_action->action(); + // We store only hbase::Get in hbase::Action as of now. It will be changed later on. + CHECK(pget) << "Unexpected. action can't be null"; + auto pb_get = RequestConverter::ToGet(*pget); + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); action_num++; } } diff --git hbase-native-client/core/simple-client.cc hbase-native-client/core/simple-client.cc index b417353..3a7d62b 100644 --- hbase-native-client/core/simple-client.cc +++ hbase-native-client/core/simple-client.cc @@ -26,6 +26,7 @@ #include #include +#include "connection/rpc-client.h" #include "core/client.h" #include "core/get.h" #include "core/put.h" @@ -75,14 +76,16 @@ int main(int argc, char *argv[]) { conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads); auto row = FLAGS_row; + auto tn = std::make_shared(folly::to(FLAGS_table)); auto num_puts = FLAGS_num_rows; auto client = std::make_unique(*conf); auto table = client->Table(*tn); - // Do the Put requests auto start_ns = TimeUtil::GetNowNanos(); + + // Do the Put requests for (uint64_t i = 0; i < num_puts; i++) { table->Put(*MakePut(Row(FLAGS_row, i))); } @@ -102,6 +105,23 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; + // Do the Multi-Gets + std::vector gets; + for (uint64_t i = 0; i < num_puts; ++i) { + hbase::Get get(Row(FLAGS_row, i)); + gets.push_back(get); + } + + start_ns = TimeUtil::GetNowNanos(); + auto results = table->Get(gets); + + if (FLAGS_display_results) { + for (const auto &result : results) LOG(INFO) << result->DebugString(); + } + + LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + table->Close(); client->Close(); diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index 8ace4af..a2f31d9 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -19,7 +19,6 @@ #include "core/table.h" -#include #include #include #include @@ -33,7 +32,6 @@ #include "serde/server-name.h" #include "utils/time-util.h" -using folly::Future; using hbase::pb::TableName; using hbase::security::User; using std::chrono::milliseconds; @@ -69,4 +67,20 @@ std::shared_ptr Table::GetRegionLocation(const std::string &row) return async_connection_->region_locator()->LocateRegion(*table_name_, row).get(); } +std::vector> Table::Get(const std::vector &gets) { + auto tresults = async_table_->Get(gets).get(operation_timeout()); + std::vector> results{}; + uint32_t num = 0; + for (auto tresult : tresults) { + if (tresult.hasValue()) { + results.push_back(tresult.value()); + } else if (tresult.hasException()) { + LOG(ERROR) << "Caught exception:- " << tresult.exception().getCopied()->what() << " for " + << gets[num++].row(); + throw tresult.exception().getCopied(); + } + } + return results; +} + } /* namespace hbase */ diff --git hbase-native-client/core/table.h hbase-native-client/core/table.h index cbb95b7..142baae 100644 --- hbase-native-client/core/table.h +++ hbase-native-client/core/table.h @@ -54,8 +54,7 @@ class Table { */ std::shared_ptr Get(const hbase::Get &get); - // TODO: next jira - // std::vector> Get(const std::vector &gets); + std::vector> Get(const std::vector &gets); /** * @brief - Puts some data in the table.