From 588355684d45ed525df2db78e25a63d206d66fd1 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Thu, 23 Feb 2017 03:48:23 +1100 Subject: [PATCH] Sources required for Multi diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc index 189130e..80883cc 100644 --- a/hbase-native-client/connection/request.cc +++ b/hbase-native-client/connection/request.cc @@ -39,3 +39,7 @@ std::unique_ptr Request::scan() { return std::make_unique(std::make_shared(), std::make_shared(), "Scan"); } +std::unique_ptr Request::multi() { + return std::make_unique(std::make_shared(), + std::make_shared(), "Multi"); +} diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h index 91c684d..4f6f391 100644 --- a/hbase-native-client/connection/request.h +++ b/hbase-native-client/connection/request.h @@ -39,7 +39,8 @@ class Request { static std::unique_ptr mutate(); /** Create a request object for a scan */ static std::unique_ptr scan(); - + /** Create a multi request object */ + static std::unique_ptr multi(); /** * This should be private. Do not use this. * diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h new file mode 100644 index 0000000..0b1eaa8 --- /dev/null +++ b/hbase-native-client/core/action.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include "core/row.h" +#include +using hbase::Row; +namespace hbase { + +class Action { + public: + Action(std::shared_ptr action, int original_index) + : action_(action), original_index_(original_index) {} + ~Action() {} + + int32_t OriginalIndex() { return original_index_; } + + const std::shared_ptr &GetAction() { return action_; } + + private: + std::shared_ptr action_; + int32_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..23bc8f1 --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller-factory.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace hbase {} /* namespace hbase */ diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-batch-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..6dd8e0a --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller-factory.h @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "core/row.h" +#include "core/async-connection.h" +#include "core/get.h" +#include "core/location-cache.h" +#include "if/HBase.pb.h" + +using hbase::Get; +using hbase::pb::TableName; + +namespace hbase { + +// class BatchCallerBuilder; + +template +class AsyncRpcRetryingBatchCallerFactory { + // BatchCallerBuilder + public: + class BatchCallerBuilder : public std::enable_shared_from_this { + public: + BatchCallerBuilder(std::shared_ptr conn, std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) { + } + + virtual ~BatchCallerBuilder() = default; + + typedef BatchCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + GenenericThisType& SetTable(std::shared_ptr table_name) { + table_name_ = table_name; + return *this; + } + + GenenericThisType& SetActions(std::shared_ptr>> actions) { + actions_ = actions; + return *this; + } + + GenenericThisType& SetOperationTimeout(long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return *this; + } + + GenenericThisType& SetRpcTimeout(long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return *this; + } + + GenenericThisType& SetPause(int64_t pause_ns) { + pause_ns_ = pause_ns; + return *this; + } + + GenenericThisType& SetMaxAttempts(int32_t max_attempts) { + max_attempts_ = max_attempts; + return *this; + } + + GenenericThisType& StartLogErrorsCnt(int32_t start_log_errors_cnt) { + start_log_errors_cnt_ = start_log_errors_cnt; + return *this; + } + + GenenericThisType& SetLocationCache(std::shared_ptr location_cache) { + location_cache_ = location_cache; + return *this; + } + + std::shared_ptr> Build() { + return std::make_shared>( + retry_timer_, conn_, table_name_, actions_, 1000, 1, operation_timeout_nanos_, + rpc_timeout_nanos_, 1, location_cache_); + } + + std::vector>> Call() { return Build()->Call(); } + + public: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + // std::shared_ptr> async_rpc_batch_; + std::shared_ptr table_name_ = nullptr; + std::shared_ptr location_cache_ = nullptr; + std::shared_ptr>> actions_ = nullptr; + int64_t operation_timeout_nanos_ = 0; + int64_t rpc_timeout_nanos_ = 0; + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int32_t start_log_errors_cnt_ = 0; + }; + + public: + // friend class BatchCallerBuilder; + AsyncRpcRetryingBatchCallerFactory(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~AsyncRpcRetryingBatchCallerFactory() = default; + + std::shared_ptr::BatchCallerBuilder> Batch() { + return std::make_shared::BatchCallerBuilder>( + conn_, retry_timer_); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc new file mode 100644 index 0000000..e550fbc --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace hbase {} +/* namespace hbase */ diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h new file mode 100644 index 0000000..886626c --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "core/action.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/get.h" +#include "core/hbase-rpc-controller.h" +#include "core/location-cache.h" +#include "core/multi_response.h" +#include "core/region_request.h" +#include "core/region_result.h" +#include "core/region-location.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/result.h" +#include "core/row.h" +#include "core/server_request.h" +#include "connection/rpc-client.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +#include + +using folly::Promise; +using folly::Future; +using hbase::Action; +using hbase::Get; +using hbase::LocationCache; +using hbase::MultiAction; +using hbase::RequestConverter; +using hbase::Result; +using hbase::RpcClient; +using hbase::Get; +using hbase::ServerRequest; +using hbase::pb::ServerName; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +using ActionsByServer = std::map, std::shared_ptr>; +using ActionsByRegion = ServerRequest::ActionsByRegion; + +template +class AsyncRpcRetryingBatchCaller { + public: + AsyncRpcRetryingBatchCaller(std::shared_ptr retry_timer, + std::shared_ptr conn, std::shared_ptr table_name, + std::shared_ptr>> actions, + int64_t pause_ns, int32_t max_attempts, + int64_t operation_timeout_nanos, int64_t rpc_timeout_nanos, + int32_t start_log_errors_count, + std::shared_ptr location_cache) + : retry_timer_(retry_timer), + conn_(conn), + table_name_(table_name), + pause_ns_(pause_ns), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + location_cache_(location_cache) { + max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); + futures_.reserve(actions_->size()); + uint32_t index = 0; + for (auto row : *actions) { + std::shared_ptr action = std::make_shared(row, index); + actions_->push_back(action); + index += 1; + auto promise = std::make_shared>(); + // We store promises by indices so that when getting futures order won't change. + action2promises_[index] = promise; + } + } + + virtual ~AsyncRpcRetryingBatchCaller() = default; + + std::vector>> Call() { + GroupAndSend(*actions_.get(), 1); + for (const auto &promise : action2promises_) { + // TODO + // futures_[promise.first] = std::make_shared>(promise.second->getFuture()); + } + return futures_; + } + + private: + long ElapsedMs() { + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns_)).count(); + } + + void CompleteExceptionally() { + // TODO Commented for now + // this->promise_->setException(RetriesExhaustedException(max_attempts_ - 1, exceptions_)); + } + + int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + int64_t RemainingTimeNs() { return operation_timeout_nanos_ - (GetNowNanos() - start_ns_); } + + std::shared_ptr> RemoveErrors(Action &action) { + // TODO Remove the action + return nullptr; + } + + // TODO Java has Supplier for RegionRequest + void LogException(int32_t tries, std::vector> ®ion_requests, + std::shared_ptr &error, + std::shared_ptr server_name) { + if (tries > start_log_errors_count_) { + std::string regions; + std::for_each( + std::begin(region_requests), std::end(region_requests), + [®ions]( + const std::vector>::value_type ®ion_request) { + regions += region_request->RegionLocation()->region_name() + ", "; + }); + LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" + << table_name_->qualifier() << " from " << server_name->host_name() + << " failed, tries=" << tries << error->what(); + } + } + + std::string GetExtraContextForError(std::shared_ptr server_name) { + return server_name->host_name(); + } + + void AddError(const std::shared_ptr &action, std::shared_ptr error, + std::shared_ptr server_name) { + std::vector action2errors; + ThrowableWithExtraContext extra_error(error, GetNowNanos(), ""); + action2errors.push_back(extra_error); + } + + void AddError(const std::vector> &actions, + std::shared_ptr error, std::shared_ptr server_name) { + for (const auto action : actions) { + AddError(action, error, server_name); + } + } + + void FailOne(const std::shared_ptr &action, int32_t tries, + std::shared_ptr error, int64_t current_time, std::string &extras) { + // TODO auto itr = action2errors.find(123); action2errors.erase(itr); + std::vector> errors; + errors.push_back(std::make_shared(error, current_time, extras)); + // TODO + // Call CompleteExceptionally + } + + void FailAll(const std::vector> &actions, int32_t tries, + std::shared_ptr error, std::shared_ptr server_name) { + int64_t current_time = 0L; + std::string extras = GetExtraContextForError(server_name); + for (const auto action : actions) { + FailOne(action, tries, error, time(0), extras); + } + } + + void FailAll(const std::vector> &actions, int32_t tries) { + for (const auto action : actions) { + // TODO + } + } + + void OnError(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr exc, std::shared_ptr server_name) { + std::vector> copied_actions; + std::vector> region_requests; + region_requests.reserve(actions_by_region.size()); + std::for_each( + std::begin(actions_by_region), std::end(actions_by_region), + [®ion_requests, &copied_actions](const ActionsByRegion::value_type &action_by_region) { + region_requests.push_back(action_by_region.second); + for (const auto &action : *action_by_region.second->Actions().get()) { + copied_actions.push_back(action); + } + }); + // TODO Java API passes Throwable error to logException + // Throwable error = translateException(t); + LogException(tries, region_requests, exc, server_name); + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + FailAll(copied_actions, tries, exc, server_name); + } + TryResubmit(copied_actions, tries); + } + + void TryResubmit(std::vector> actions, int32_t tries) { + long delay_nanos; + if (operation_timeout_nanos_ > 0) { + long max_delay_nanos = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_nanos <= 0) { + FailAll(actions, tries); + return; + } + delay_nanos = std::min(max_delay_nanos, ConnectionUtils::GetPauseTime(pause_ns_, tries - 1)); + } else { + delay_nanos = ConnectionUtils::GetPauseTime(pause_ns_, tries - 1); + } + // TODO retry_timer_ reset + // retry_timer_->scheduleTimeoutFn(GroupAndSend(actions, tries + 1), + // std::chrono::milliseconds(delay_nanos)); + } + + void GroupAndSend(std::vector> &actions, int32_t tries) { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + FailAll(*actions_.get(), tries); + return; + } + } else { + locate_timeout_ns = -1L; + } + + ActionsByServer actions_by_server; + std::vector> locate_failed; + + // We iterate through each of the actions and try to locate a region. If its successful or an + // exception, we remove it from the vector and continue to check upon remaining actions. + // This way we call Send()/TryResubmit() only after all regions are looked up + int32_t index = 0; + int32_t counter = 0; + // If region is located or to be retried, we store its index to be skipped. + std::vector skip; + bool actions_completed = false; + int64_t millisecs = 1000; + while (skip.size() != actions.size()) { + // Increase get time as we re-loop + millisecs *= (counter += 1); + for (index = 0; index < actions.size(); ++index) { + if (std::find(std::begin(skip), std::end(skip), index) != std::end(skip)) { + continue; + } + auto action = actions[index]; + auto row = action->GetAction()->row(); + try { + // TODO Replace with loc = conn_->get_locator()....; + auto region_loc = + location_cache_->LocateRegion(*table_name_, row).get(milliseconds(millisecs)); + // Add it to actions_by_server; + bool found = false; + for (auto itr = actions_by_server.begin(); itr != actions_by_server.end(); ++itr) { + if (region_loc->server_name().host_name() == itr->first->host_name()) { + found = true; + itr->second->AddAction(region_loc, action); + break; + } + } + if (!found) { + // Create new key + auto server_request = std::make_shared(region_loc); + server_request->AddAction(region_loc, action); + auto server_name = std::make_shared(region_loc->server_name()); + actions_by_server[server_name] = server_request; + } + skip.push_back(index); + VLOG(3) << "row [" << row << "] in table[" << table_name_->namespace_() << ":" + << table_name_->qualifier() << " found in region [" << region_loc->region_name() + << "]"; + } + catch (const std::runtime_error &rexc) { + // Table does not exist + LOG(ERROR) << "Caught exception:- [" << rexc.what() << "] while fetching region for row [" + << row << " in table[" << table_name_->namespace_() << ":" + << table_name_->qualifier(); + // Remove the action from actions as we got error from LocationCache + skip.push_back(index); + } + catch (const folly::TimedOut &timed_out_exc) { + // continue to the next action; this action will be retried in the next iteration + continue; + } + catch (const std::exception &exc) { + auto pexc = std::make_shared(exc); + if (auto error = std::dynamic_pointer_cast(pexc)) { + int64_t current_time = 0; + std::string extra = ""; + FailOne(action, tries, nullptr, time(0), extra); + return; + } + AddError(action, pexc, nullptr); + locate_failed.push_back(action); + skip.push_back(index); + } + } + } + + // When all regions in actions have are either located or failed we'll Send and TryResubmit + if (!actions_by_server.empty()) { + Send(actions_by_server, tries); + } + if (!locate_failed.empty()) { + TryResubmit(locate_failed, tries); + } + return; + } + + void Send(const ActionsByServer &actions_by_server, int32_t tries) { + int64_t remaining_ns; + if (operation_timeout_nanos_ > 0) { + remaining_ns = RemainingTimeNs(); + if (remaining_ns <= 0) { + // TODO + std::vector> failed_actions; + std::for_each(std::begin(actions_by_server), std::end(actions_by_server), + [&failed_actions](const ActionsByServer::value_type &action_by_server) { + for (const auto &value : action_by_server.second->Actions()) { + for (const auto &failed_action : *value.second->Actions().get()) { + failed_actions.push_back(failed_action); + } + } + }); + FailAll(failed_actions, tries); + return; + } + } else { + remaining_ns = std::numeric_limits::max(); + } + + for (auto action : actions_by_server) { + for (auto itr = action.second->Actions().begin(); itr != action.second->Actions().end(); + ++itr) { + auto loc = itr->second->RegionLocation(); + auto region_name = itr->first; + auto actions_by_region = itr->second->Actions(); + + // TODO + auto cs_stub = nullptr; // std::make_shared(); + // TODO Setting null as of now + std::shared_ptr controller = + nullptr; // std::make_shared(); + if (controller) { + ConnectionUtils::ResetController(controller, std::min(rpc_timeout_nanos_, remaining_ns)); + } + auto multi_req = std::shared_ptr( + RequestConverter::ToMultiRequest(region_name, *actions_by_region.get()).release()); + auto multi_resp = std::make_shared(); + // TODO + /* + cs_stub->Multi( + (std::static_pointer_cast(controller)).get(), + (std::dynamic_pointer_cast(multi_req->req_msg())).get(), + (std::dynamic_pointer_cast(multi_resp->resp_msg())).get(), + nullptr); + */ + if (controller) { + if (controller->Failed()) { + // TODO At present throwing std::runtime_error. Make compatible with Java API + OnError(action.second->Actions(), tries, + std::make_shared(std::runtime_error("Controller Failed")), + action.first); + } else { + try { + auto multi_results = ResponseConverter::GetResults(multi_req, multi_resp); + OnComplete(action.second->Actions(), tries, action.first, multi_results); + } + catch (const std::exception &exc) { + OnError(action.second->Actions(), tries, std::make_shared(exc), + action.first); + return; + } + } + } + } + } + return; + } + + void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, + const std::shared_ptr server_name, + const std::shared_ptr &multi_results) { + std::vector> failed_actions; + for (const auto &action_by_region : actions_by_region) { + auto region_name = action_by_region.first; + auto region_result_itr = multi_results->Results().find(region_name); + // If not found create + if (region_result_itr != multi_results->Results().end()) { + for (const auto &action : *action_by_region.second->Actions().get()) { + OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second, + failed_actions); + } + } else { + // get region exception + std::shared_ptr region_exc = multi_results->Exception(region_name); + std::shared_ptr exc; + if (region_exc == nullptr) { + LOG(ERROR) << "Server sent us neither results nor exceptions for " << region_name; + exc = std::make_shared(std::runtime_error("Invalid response")); + } else { + // TODO // Translate exception + // exc = TranslateException(region_exc); + + // TODO, take a look at second parameter again + // LogException(tries, action_by_region.second->Actions(), exc, server_name); + // TODO Update cached locations + // conn.getLocator().updateCachedLocation + for (const auto &action : *action_by_region.second->Actions().get()) { + failed_actions.push_back(action); + } + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + FailAll(failed_actions, tries, exc, server_name); + return; + } + } + AddError(*action_by_region.second->Actions().get(), exc, server_name); + } + } + if (!failed_actions.empty()) { + TryResubmit(failed_actions, tries); + } + } + + void OnComplete(const std::shared_ptr &action, + const std::shared_ptr ®ion_request, int32_t tries, + const std::shared_ptr &server_name, + const std::shared_ptr ®ion_result, + std::vector> &failed_actions) { + + auto result_or_exc = region_result->Result(action->OriginalIndex()); + bool no_result_or_exc_from_server = false; + std::string err_msg; + if (result_or_exc == nullptr) { + LOG(ERROR) << "Server " << server_name->ShortDebugString() + << " sent us neither results nor exceptions for row " << action->GetAction()->row() + << " of " << region_request->RegionLocation()->region_name(); + no_result_or_exc_from_server = true; + err_msg = "Invalid response"; + } else { + auto result = std::get<0>(*result_or_exc.get()); + auto exc = std::get<1>(*result_or_exc.get()); + if (exc != nullptr) { + // TODO translateException + // Need some work on exceptions/exception.h/.cc + // Throwable error = translateException((Throwable) result); + // LogException(tries, region_request, error, server_name); + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + auto extra_error = GetExtraContextForError(server_name); + FailOne(action, tries, exc, time(0), extra_error); + } else { + failed_actions.push_back(action); + } + } else if (result != nullptr) { + // TODO + auto promise = std::make_shared>(); + // TODO Getting compilation error + // promise->setValue(result); + action2promises_[action->OriginalIndex()] = promise; + } else { + // Ideally, this block wont be reached, but if we still come here it means we are unable to + // understand response or exception from the server. Let's log an error and raise an + // exception + LOG(ERROR) << "Unable to set either of result or exception received from Server " + << server_name->ShortDebugString() << " for row " << action->GetAction()->row() + << " of " << region_request->RegionLocation()->region_name(); + no_result_or_exc_from_server = true; + err_msg = "Unable to set either of result or exception for response received from server."; + } + } + + if (no_result_or_exc_from_server) { + AddError(action, std::make_shared(err_msg), server_name); + failed_actions.push_back(action); + } + + return; + } + + private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::shared_ptr>> actions_ = + std::make_shared>>(); + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int64_t operation_timeout_nanos_ = 0; + int64_t rpc_timeout_nanos_ = 0; + int32_t start_log_errors_count_ = 0; + + int64_t start_ns_ = TimeUtil::GetNowNanos(); + int32_t tries_ = 1; + std::shared_ptr> exceptions_ = + std::make_shared>(); + std::map> action2errors_; + std::vector>> futures_; + std::map>> action2promises_; + std::shared_ptr location_cache_; +}; +} +/* namespace hbase */ diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc new file mode 100644 index 0000000..b39f289 --- /dev/null +++ b/hbase-native-client/core/async-connection.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-connection.h" + +namespace hbase {} // namespace hbase diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h new file mode 100644 index 0000000..757354f --- /dev/null +++ b/hbase-native-client/core/async-connection.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +namespace hbase { + +class AsyncConnection { + public: + AsyncConnection() {} + virtual ~AsyncConnection() = default; +}; + +class AsyncConnectionImpl { + public: + AsyncConnectionImpl() {} + virtual ~AsyncConnectionImpl() = default; +}; +} // namespace hbase diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc index 5c5f446..95713b4 100644 --- a/hbase-native-client/core/get.cc +++ b/hbase-native-client/core/get.cc @@ -26,7 +26,7 @@ namespace hbase { Get::~Get() {} -Get::Get(const std::string &row) : row_(row) { CheckRow(row_); } +Get::Get(const std::string &row) : hbase::Row(row) {} Get::Get(const Get &get) { row_ = get.row_; @@ -78,7 +78,7 @@ Get &Get::AddColumn(const std::string &family, const std::string &qualifier) { return *this; } -const std::string &Get::Row() const { return row_; } +// const std::string &Get::Row() const { return row_; } hbase::pb::Consistency Get::Consistency() const { return consistency_; } diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h index f79c633..f13ef69 100644 --- a/hbase-native-client/core/get.h +++ b/hbase-native-client/core/get.h @@ -24,6 +24,7 @@ #include #include #include +#include "core/row.h" #include "core/time_range.h" #include "if/Client.pb.h" @@ -35,7 +36,7 @@ namespace hbase { */ using FamilyMap = std::map>; -class Get { +class Get : public hbase::Row { public: /** * Constructors @@ -111,7 +112,7 @@ class Get { /** * @brief Returns the row for this Get operation */ - const std::string& Row() const; + // const std::string& Row() const; /** * @brief Returns true if family map (FamilyMap) is non empty false otherwise @@ -130,7 +131,7 @@ class Get { Get& SetConsistency(hbase::pb::Consistency consistency); private: - std::string row_ = ""; + // std::string row_ = ""; int32_t max_versions_ = 1; bool cache_blocks_ = true; bool check_existence_only_ = false; diff --git a/hbase-native-client/core/multi_action.cc b/hbase-native-client/core/multi_action.cc new file mode 100644 index 0000000..ace3e10 --- /dev/null +++ b/hbase-native-client/core/multi_action.cc @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace hbase { + +MultiAction::MultiAction() { + // TODO Auto-generated constructor stub +} + +MultiAction::~MultiAction() { + // TODO Auto-generated destructor stub +} + +void MultiAction::Add(const std::string& region_name, std::shared_ptr get) { + actions_[region_name].push_back(get); +} + +void MultiAction::Add(const std::string& region_name, + const std::vector >& gets) { + actions_[region_name] = gets; +} + +} /* namespace hbase */ + +int hbase::MultiAction::Size() const { + + int size = 0; + for (const auto& action : actions_) { + size += action.second.size(); + } + return size; +} + +std::vector hbase::MultiAction::Regions() const { + std::vector regions; + /* + std::transform(std::begin(gets_), std::end(gets_), std::back_inserter(regions), + [](auto &get_pair) { + return get_pair.first; + }); + */ + return regions; +} diff --git a/hbase-native-client/core/multi_action.h b/hbase-native-client/core/multi_action.h new file mode 100644 index 0000000..2d211f0 --- /dev/null +++ b/hbase-native-client/core/multi_action.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "core/get.h" + +using hbase::Get; + +namespace hbase { +using MultiActions = std::map>>; +class MultiAction { + private: + MultiActions actions_; + + public: + MultiAction(); + ~MultiAction(); + + void Add(const std::string ®ion_name, std::shared_ptr get); + void Add(const std::string ®ion_name, const std::vector> &gets); + + bool IsEmpty() const { return actions_.empty(); } + int Size() const; + std::vector Regions() const; + const MultiActions &Actions() const { return actions_; } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi_response.cc b/hbase-native-client/core/multi_response.cc new file mode 100644 index 0000000..2fc0735 --- /dev/null +++ b/hbase-native-client/core/multi_response.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/multi_response.h" + +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->Size(); + } + return size; +} + +void MultiResponse::Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, + std::shared_ptr exc) { + Result(region_name)->AddResult(original_index, result, exc); +} + +void MultiResponse::AddException(const std::string& region_name, + std::shared_ptr exception) { + exceptions_[region_name] = exception; +} + +const std::shared_ptr& MultiResponse::Exception(const std::string& region_name) + const { + try { + auto find = exceptions_.at(region_name); + return find; + } + catch (const std::out_of_range& oor) { + return nullptr; + } +} + +const std::map >& MultiResponse::Exceptions() const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + const std::shared_ptr& stat) { + Result(region_name)->SetStat(stat); +} + +const std::map >& MultiResponse::Results() const { + return results_; +} + +std::shared_ptr& MultiResponse::Result(const std::string& region_name) { + auto found = results_.find(region_name); + // If not found create + if (found != results_.end()) { + results_[region_name] = std::make_shared(); + } + return results_[region_name]; +} + +MultiResponse::ResponseType MultiResponse::Type() const { return ResponseType::MULTI; } + +MultiResponse::~MultiResponse() { + // TODO Auto-generated destructor stub +} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi_response.h b/hbase-native-client/core/multi_response.h new file mode 100644 index 0000000..6c51a97 --- /dev/null +++ b/hbase-native-client/core/multi_response.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "core/region_result.h" +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class MultiResponse { + public: + enum class ResponseType { + SINGLE = 0, + MULTI = 1, + }; + MultiResponse(); + + /** + * @brief Returns Number of pairs in this container + */ + int Size() const; + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param originalIndex the original index of the Action (request). + * @param resOrEx the result or error; will be empty for successful Put and Delete actions. + */ + void Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, std::shared_ptr exc); + + void AddException(const std::string& region_name, std::shared_ptr exception); + + /** + * @return the exception for the region, if any. Null otherwise. + */ + const std::shared_ptr& Exception(const std::string& region_name) const; + + const std::map>& Exceptions() const; + + void AddStatistic(const std::string& region_name, const std::shared_ptr& stat); + + const std::map>& Results() const; + + MultiResponse::ResponseType Type() const; + + virtual ~MultiResponse(); + + private: + std::shared_ptr& Result(const std::string& region_name); + + // map of regionName to map of Results by the original index for that Result + std::map> results_; + /** + * The server can send us a failure for the region itself, instead of individual failure. + * It's a part of the protobuf definition. + */ + std::map> exceptions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region_request.h b/hbase-native-client/core/region_request.h new file mode 100644 index 0000000..98d5b66 --- /dev/null +++ b/hbase-native-client/core/region_request.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" + +using hbase::Action; +namespace hbase { +// TODO Do we require RegionLocation. Recheck async-batch-rpc-retrying-caller.h Send() method later +class RegionRequest { + public: + using ActionList = std::shared_ptr>>; + RegionRequest(const std::shared_ptr ®ion_loc) + : region_loc_(region_loc) {} + ~RegionRequest() {} + void AddAction(std::shared_ptr action) { actions_->push_back(action); } + std::shared_ptr RegionLocation() { return region_loc_; } + const ActionList &Actions() { return actions_; } + + private: + std::shared_ptr region_loc_; + ActionList actions_ = std::make_shared>>(); +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region_result.h b/hbase-native-client/core/region_result.h new file mode 100644 index 0000000..cf18fd0 --- /dev/null +++ b/hbase-native-client/core/region_result.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class RegionResult { + using ResultOrException = + std::tuple, std::shared_ptr>; + + public: + RegionResult() {} + void AddResult(int32_t index, std::shared_ptr result, + std::shared_ptr exc) { + auto res_or_ex = std::make_tuple(result, exc); + auto ptr = std::make_shared(res_or_ex); + result_[index] = ptr; + } + + void SetStat(std::shared_ptr stat) { stat_ = stat; } + + int Size() { return result_.size(); } + + const std::shared_ptr Result(int32_t index) const { + auto found = result_.find(index); + // If not found return nullptr + if (found != result_.end()) { + return found->second; + } + return nullptr; + } + + const std::shared_ptr& Stat() { return stat_; } + + ~RegionResult() {} + + private: + std::map> result_; + std::shared_ptr stat_; +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/request_converter.cc b/hbase-native-client/core/request_converter.cc index eba07df..642b916 100644 --- a/hbase-native-client/core/request_converter.cc +++ b/hbase-native-client/core/request_converter.cc @@ -23,6 +23,7 @@ using hbase::Request; using hbase::pb::GetRequest; +using hbase::pb::MultiRequest; using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; using hbase::pb::ScanRequest; @@ -57,7 +58,7 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, pb_time_range->set_from(get.Timerange().MinTimeStamp()); pb_time_range->set_to(get.Timerange().MaxTimeStamp()); } - pb_get->set_row(get.Row()); + pb_get->set_row(get.row()); if (get.HasFamilies()) { for (const auto &family : get.Family()) { auto column = pb_get->add_column(); @@ -115,4 +116,51 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, return pb_req; } + +std::unique_ptr RequestConverter::ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + auto region_specifier = new hbase::pb::RegionSpecifier(); + RequestConverter::SetRegion(region_name, region_specifier); + auto pb_region_action = pb_msg->add_regionaction(); + pb_region_action->set_allocated_region(region_specifier); + int action_num = 0; + for (const auto ®ion_action : region_actions) { + auto pb_action = pb_region_action->add_action(); + auto action = region_action->GetAction(); + if (auto pget = std::dynamic_pointer_cast(action)) { + auto pb_get = RequestConverter::ToGet(*pget); + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); + } + action_num += 1; + } + return pb_req; +} + +std::unique_ptr RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + return pb_get; +} + } /* namespace hbase */ diff --git a/hbase-native-client/core/request_converter.h b/hbase-native-client/core/request_converter.h index 57f08cc..7e0e9dc 100644 --- a/hbase-native-client/core/request_converter.h +++ b/hbase-native-client/core/request_converter.h @@ -23,10 +23,18 @@ #include #include "connection/request.h" #include "core/get.h" +#include "core/multi_action.h" +#include "core/region_request.h" #include "core/scan.h" #include "if/HBase.pb.h" +using hbase::Get; +using hbase::MultiAction; +using hbase::RegionRequest; +using hbase::Request; +using hbase::Scan; using hbase::pb::RegionSpecifier; +using hbase::pb::ServerName; namespace hbase { /** @@ -53,6 +61,9 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +75,8 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); + + static std::unique_ptr ToGet(const Get &get); }; } /* namespace hbase */ diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc index 19a3554..552cd68 100644 --- a/hbase-native-client/core/response_converter.cc +++ b/hbase-native-client/core/response_converter.cc @@ -22,8 +22,17 @@ #include #include "core/cell.h" +#include "if/Client.pb.h" + +#include using hbase::pb::GetResponse; +using hbase::pb::MultiRegionLoadStats; +using hbase::pb::MultiRequest; +using hbase::pb::RegionAction; +using hbase::pb::RegionActionResult; +using hbase::pb::RegionSpecifier; +using hbase::pb::ResultOrException; using hbase::pb::ScanResponse; namespace hbase { @@ -93,4 +102,103 @@ std::vector> ResponseConverter::FromScanResponse(const R return results; } + +std::shared_ptr ResponseConverter::GetResults( + const std::shared_ptr& req, const std::shared_ptr& resp) { + + auto multi_req = std::static_pointer_cast(req->req_msg()); + auto multi_resp = std::static_pointer_cast(resp->resp_msg()); + + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + + auto multi_results = std::make_shared(); + for (int32_t num = 0; num < res_region_action_count; num++) { + RegionAction actions = multi_req->regionaction(num); + RegionActionResult action_result = multi_resp->regionactionresult(num); + RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + auto region_name = rs.value(); + + if (action_result.has_exception()) { + std::shared_ptr exc; + std::shared_ptr rexc; + ([action_result, exc, rexc]() { + hbase::pb::NameBytesPair nb_pair = action_result.exception(); + if (nb_pair.has_value()) { + // rexc = std::make_shared(nb_pair.name() + " " + nb_pair.value()); + // exc = std::move(rexc); + } + }); + multi_results->AddException(region_name, exc); + continue; + } + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); + } + + for (ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr result; + std::shared_ptr exc; + if (roe.has_exception()) { + std::shared_ptr rexc; + ([action_result, exc, rexc]() { + hbase::pb::NameBytesPair nb_pair = action_result.exception(); + if (nb_pair.has_value()) { + // rexc = std::make_shared(nb_pair.name() + " " + nb_pair.value()); + // exc = std::move(rexc); + } + }); + } else if (roe.has_result()) { + ([roe, &result]() { + bool exists = false; + bool stale = false; + bool partial = false; + // Parse Results + std::vector> vcells; + for (auto cell : roe.result().cell()) { + std::shared_ptr pcell = std::make_shared( + cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), cell.value(), + static_cast(cell.cell_type())); + vcells.push_back(pcell); + } + exists = roe.result().exists(); + stale = roe.result().stale(); + partial = roe.result().partial(); + result = std::make_shared(vcells, exists, stale, partial); + }); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; + } else { + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector> empty_cells; + result = std::make_shared(empty_cells, multi_resp->processed() ? true : false, + false, false); + } + multi_results->Add(region_name, roe.index(), result, exc); + } + } + + if (multi_resp->has_regionstatistics()) { + MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_results->AddStatistic(stats.region(i).value(), + std::make_shared(stats.stat(i))); + } + } + return multi_results; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h index 859644b..74b84e6 100644 --- a/hbase-native-client/core/response_converter.h +++ b/hbase-native-client/core/response_converter.h @@ -20,11 +20,16 @@ #pragma once #include +#include "connection/request.h" #include "connection/response.h" +#include "core/multi_response.h" #include "core/result.h" #include "if/Client.pb.h" #include "serde/cell-scanner.h" +using hbase::Request; +using hbase::Response; + namespace hbase { /** @@ -46,6 +51,9 @@ class ResponseConverter { static std::vector> FromScanResponse(const Response& resp); + static std::shared_ptr GetResults(const std::shared_ptr& req, + const std::shared_ptr& resp); + private: // Constructor not required. We have all static methods to extract response from PB messages. ResponseConverter(); diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h new file mode 100644 index 0000000..eb5b189 --- /dev/null +++ b/hbase-native-client/core/row.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#pragma once + +namespace hbase { + +class Row { + public: + Row() {} + Row(const std::string &row) : row_(row) { CheckRow(row_); } + + /** + * @brief Returns the row for the Row interface. + */ + const std::string &row() const { return row_; } + virtual ~Row() {} + + /** + * @brief Checks if the row for this Get operation is proper or not + * @param row Row to check + * @throws std::runtime_error if row is empty or greater than + * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) + */ + void CheckRow(const std::string &row) { + const int16_t kMaxRowLength = std::numeric_limits::max(); + int16_t row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error("Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } + } + + protected: + std::string row_ = ""; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/server_request.h b/hbase-native-client/core/server_request.h new file mode 100644 index 0000000..9d5345d --- /dev/null +++ b/hbase-native-client/core/server_request.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" +#include "core/region_request.h" + +using hbase::Action; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + using ActionsByRegion = std::map>; + + ServerRequest(std::shared_ptr region_location) { + auto region_name = region_location->region_name(); + auto region_request = std::make_shared(region_location); + actions_by_region_[region_name] = region_request; + } + ~ServerRequest() {} + + void AddAction(std::shared_ptr region_location, std::shared_ptr action) { + auto region_name = region_location->region_name(); + auto itr = actions_by_region_.at(region_name); + itr->AddAction(action); + } + + const ActionsByRegion &Actions() { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ -- 1.8.3.1