From 437d4fb5dfd418c90c126edc82fd294e0d653204 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Thu, 16 Mar 2017 08:54:08 +1100 Subject: [PATCH] Helper classes required for AsyncBatchRpcRetryingCaller class diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc index 189130e..80883cc 100644 --- a/hbase-native-client/connection/request.cc +++ b/hbase-native-client/connection/request.cc @@ -39,3 +39,7 @@ std::unique_ptr Request::scan() { return std::make_unique(std::make_shared(), std::make_shared(), "Scan"); } +std::unique_ptr Request::multi() { + return std::make_unique(std::make_shared(), + std::make_shared(), "Multi"); +} diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h index 91c684d..520b380 100644 --- a/hbase-native-client/connection/request.h +++ b/hbase-native-client/connection/request.h @@ -39,6 +39,8 @@ class Request { static std::unique_ptr mutate(); /** Create a request object for a scan */ static std::unique_ptr scan(); + /** Create a request object for a multi */ + static std::unique_ptr multi(); /** * This should be private. Do not use this. diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h new file mode 100644 index 0000000..3d91b73 --- /dev/null +++ b/hbase-native-client/core/action.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include "core/row.h" + +using hbase::Row; +namespace hbase { + +class Action { + public: + Action(std::shared_ptr action, int original_index) + : action_(action), original_index_(original_index) {} + ~Action() {} + + int64_t original_index() const { return original_index_; } + + Row *action() const { return action_.get(); } + + private: + std::shared_ptr action_; + int64_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc index 07d0003..6ee2715 100644 --- a/hbase-native-client/core/get-test.cc +++ b/hbase-native-client/core/get-test.cc @@ -21,7 +21,8 @@ #include #include -using namespace hbase; + +using hbase::Get; const int NUMBER_OF_GETS = 5; void CheckFamilies(Get &get) { @@ -102,7 +103,7 @@ void CheckFamiliesAfterCopy(Get &get) { } void GetMethods(Get &get, const std::string &row) { - EXPECT_EQ(row, get.Row()); + EXPECT_EQ(row, get.row()); CheckFamilies(get); EXPECT_EQ(true, get.CacheBlocks()); diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc index 5c5f446..afeb429 100644 --- a/hbase-native-client/core/get.cc +++ b/hbase-native-client/core/get.cc @@ -26,7 +26,7 @@ namespace hbase { Get::~Get() {} -Get::Get(const std::string &row) : row_(row) { CheckRow(row_); } +Get::Get(const std::string &row) : Row(row) {} Get::Get(const Get &get) { row_ = get.row_; @@ -78,8 +78,6 @@ Get &Get::AddColumn(const std::string &family, const std::string &qualifier) { return *this; } -const std::string &Get::Row() const { return row_; } - hbase::pb::Consistency Get::Consistency() const { return consistency_; } Get &Get::SetConsistency(hbase::pb::Consistency consistency) { @@ -119,15 +117,4 @@ Get &Get::SetTimeStamp(int64_t timestamp) { const TimeRange &Get::Timerange() const { return *tr_; } -void Get::CheckRow(const std::string &row) { - const int kMaxRowLength = std::numeric_limits::max(); - int row_length = row.size(); - if (0 == row_length) { - throw std::runtime_error("Row length can't be 0"); - } - if (row_length > kMaxRowLength) { - throw std::runtime_error("Length of " + row + " is greater than max row size: " + - std::to_string(kMaxRowLength)); - } -} } // namespace hbase diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h index 5492f21..e0be4e7 100644 --- a/hbase-native-client/core/get.h +++ b/hbase-native-client/core/get.h @@ -25,9 +25,11 @@ #include #include #include "core/query.h" +#include "core/row.h" #include "core/time-range.h" #include "if/Client.pb.h" +using hbase::Row; namespace hbase { /** @@ -36,7 +38,7 @@ namespace hbase { */ using FamilyMap = std::map>; -class Get : public Query { +class Get : public Row, public Query { public: /** * Constructors @@ -110,11 +112,6 @@ class Get : public Query { Get& AddColumn(const std::string& family, const std::string& qualifier); /** - * @brief Returns the row for this Get operation - */ - const std::string& Row() const; - - /** * @brief Returns true if family map (FamilyMap) is non empty false otherwise */ bool HasFamilies() const; @@ -131,21 +128,12 @@ class Get : public Query { Get& SetConsistency(hbase::pb::Consistency consistency); private: - std::string row_ = ""; int32_t max_versions_ = 1; bool cache_blocks_ = true; bool check_existence_only_ = false; FamilyMap family_map_; hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; std::unique_ptr tr_ = std::make_unique(); - - /** - * @brief Checks if the row for this Get operation is proper or not - * @param row Row to check - * @throws std::runtime_error if row is empty or greater than - * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) - */ - void CheckRow(const std::string& row); }; } // namespace hbase diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc new file mode 100644 index 0000000..f55a072 --- /dev/null +++ b/hbase-native-client/core/multi-response.cc @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/multi-response.h" + +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->size(); + } + return size; +} + +void MultiResponse::Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, std::shared_ptr exc) { + bool region_found = false; + for (auto itr = results_.begin(); itr != results_.end(); ++itr) { + if (itr->first == region_name) { + region_found = true; + itr->second->set_result(original_index, result, exc); + break; + } + } + if (!region_found) { + auto region_result = std::make_shared(); + region_result->set_result(original_index, result, exc); + results_[region_name] = region_result; + } +} + +void MultiResponse::AddException(const std::string& region_name, + std::shared_ptr exception) { + exceptions_[region_name] = exception; +} + +std::shared_ptr MultiResponse::Exception(const std::string& region_name) const { + auto find = exceptions_.at(region_name); + return find; +} + +const std::map >& MultiResponse::Exceptions() const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + const std::shared_ptr& stat) { + results_[region_name]->set_stat(stat); +} + +const std::map >& MultiResponse::Results() const { + return results_; +} + +MultiResponse::ResponseType MultiResponse::Type() const { return ResponseType::MULTI; } + +MultiResponse::~MultiResponse() {} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h new file mode 100644 index 0000000..d665caf --- /dev/null +++ b/hbase-native-client/core/multi-response.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class MultiResponse { + public: + enum class ResponseType { + SINGLE = 0, + MULTI = 1, + }; + MultiResponse(); + explicit MultiResponse(const std::string& region_name); + /** + * @brief Returns Number of pairs in this container + */ + int Size() const; + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param originalIndex the original index of the Action (request). + * @param resOrEx the result or error; will be empty for successful Put and Delete actions. + */ + void Add(const std::string& region_name, int32_t original_index, std::shared_ptr result, + std::shared_ptr exc); + + void AddException(const std::string& region_name, std::shared_ptr exception); + + /** + * @return the exception for the region, if any. Null otherwise. + */ + std::shared_ptr Exception(const std::string& region_name) const; + + const std::map>& Exceptions() const; + + void AddStatistic(const std::string& region_name, const std::shared_ptr& stat); + + const std::map>& Results() const; + + MultiResponse::ResponseType Type() const; + + virtual ~MultiResponse(); + + private: + // map of regionName to map of Results by the original index for that Result + std::map> results_; + /** + * The server can send us a failure for the region itself, instead of individual failure. + * It's a part of the protobuf definition. + */ + std::map> exceptions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h new file mode 100644 index 0000000..16a8293 --- /dev/null +++ b/hbase-native-client/core/region-request.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" + +using hbase::Action; +namespace hbase { + +class RegionRequest { + public: + using ActionList = std::vector>; + explicit RegionRequest(const std::shared_ptr ®ion_loc) + : region_loc_(region_loc) {} + ~RegionRequest() {} + void set_action(std::shared_ptr action) { actions_.push_back(action); } + std::shared_ptr region_location() const { return region_loc_; } + const ActionList &actions() const { return actions_; } + + private: + std::shared_ptr region_loc_; + ActionList actions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc new file mode 100644 index 0000000..960606f --- /dev/null +++ b/hbase-native-client/core/region-result.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/region-result.h" +#include +#include + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +RegionResult::RegionResult() {} + +RegionResult::~RegionResult() {} + +void RegionResult::set_result(int32_t index, std::shared_ptr result, + std::shared_ptr exc) { + auto index_found = result_.find(index); + if (index_found == result_.end()) { + result_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); + } else { + throw std::runtime_error("Index " + std::to_string(index) + + " already set with ResultOrException"); + } +} + +void RegionResult::set_stat(std::shared_ptr stat) { stat_ = stat; } + +int RegionResult::size() const { return result_.size(); } + +std::shared_ptr RegionResult::result(int32_t index) const { + return std::make_shared(result_.at(index)); +} + +const std::shared_ptr& RegionResult::stat() const { return stat_; } + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h new file mode 100644 index 0000000..f1f4bbd --- /dev/null +++ b/hbase-native-client/core/region-result.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { +using ResultOrExceptionTuple = + std::tuple, std::shared_ptr>; +class RegionResult { + public: + RegionResult(); + void set_result(int32_t index, std::shared_ptr result, + std::shared_ptr exc); + + void set_stat(std::shared_ptr stat); + + int size() const; + + std::shared_ptr result(int32_t index) const; + + const std::shared_ptr& stat() const; + + ~RegionResult(); + + private: + std::map result_; + std::shared_ptr stat_ = std::make_shared(); +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 227e04a..3f625ff 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -18,6 +18,7 @@ */ #include "core/request-converter.h" +#include #include #include "if/Client.pb.h" @@ -43,35 +44,9 @@ void RequestConverter::SetRegion(const std::string ®ion_name, std::unique_ptr RequestConverter::ToGetRequest(const Get &get, const std::string ®ion_name) { auto pb_req = Request::get(); - auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_get = pb_msg->mutable_get(); - pb_get->set_max_versions(get.MaxVersions()); - pb_get->set_cache_blocks(get.CacheBlocks()); - pb_get->set_consistency(get.Consistency()); - - if (!get.Timerange().IsAllTime()) { - hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); - pb_time_range->set_from(get.Timerange().MinTimeStamp()); - pb_time_range->set_to(get.Timerange().MaxTimeStamp()); - } - pb_get->set_row(get.Row()); - if (get.HasFamilies()) { - for (const auto &family : get.Family()) { - auto column = pb_get->add_column(); - column->set_family(family.first); - for (const auto &qualifier : family.second) { - column->add_qualifier(qualifier); - } - } - } - - if (get.filter() != nullptr) { - pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); - } - + pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release()); return pb_req; } @@ -123,4 +98,56 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, return pb_req; } + +folly::Future> RequestConverter::ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + auto region_specifier = new hbase::pb::RegionSpecifier(); + RequestConverter::SetRegion(region_name, region_specifier); + auto pb_region_action = pb_msg->add_regionaction(); + pb_region_action->set_allocated_region(region_specifier); + int action_num = 0; + for (const auto ®ion_action : region_actions) { + + auto pb_action = pb_region_action->add_action(); + auto action = region_action->action(); + if (auto pget = dynamic_cast(action)) { + auto pb_get = RequestConverter::ToGet(*pget); + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); + } + action_num += 1; + } + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return folly::makeFuture>(std::move(pb_req)); +} + +std::unique_ptr RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + if (get.filter() != nullptr) { + pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); + } + return pb_get; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 57f08cc..795f0bb 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -21,12 +21,19 @@ #include #include +#include #include "connection/request.h" #include "core/get.h" +#include "core/region-request.h" #include "core/scan.h" #include "if/HBase.pb.h" +using hbase::Get; +using hbase::RegionRequest; +using hbase::Request; +using hbase::Scan; using hbase::pb::RegionSpecifier; +using hbase::pb::ServerName; namespace hbase { /** @@ -53,6 +60,9 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static folly::Future> ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +74,8 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); + + static std::unique_ptr ToGet(const Get &get); }; } /* namespace hbase */ diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index 7bb5e5d..3b4d785 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -19,10 +19,13 @@ #include "core/response-converter.h" +#include +#include +#include #include #include - #include "core/cell.h" +#include "exceptions/exception.h" using hbase::pb::GetResponse; using hbase::pb::ScanResponse; @@ -51,10 +54,17 @@ std::unique_ptr ResponseConverter::ToResult( // iterate over the cells coming from rpc codec if (cell_scanner != nullptr) { - while (cell_scanner->Advance()) { + int cells_read = 0; + while (cells_read != result.associated_cell_count()) { + if (cell_scanner->Advance()) { vcells.push_back(cell_scanner->Current()); + cells_read += 1; + } else { + LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. Cells Read:- " + << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); + std::runtime_error("CellScanner::Advance() returned false unexpectedly"); + } } - // TODO: check associated cell count? } return std::make_unique(vcells, result.exists(), result.stale(), result.partial()); } @@ -94,4 +104,78 @@ std::vector> ResponseConverter::FromScanResponse(const R return results; } + +std::unique_ptr ResponseConverter::GetResults(std::shared_ptr req, + const Response& resp) { + auto multi_req = std::static_pointer_cast(req->req_msg()); + auto multi_resp = std::static_pointer_cast(resp.resp_msg()); + VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + auto multi_results = std::make_unique(); + for (int32_t num = 0; num < res_region_action_count; num++) { + hbase::pb::RegionAction actions = multi_req->regionaction(num); + hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num); + hbase::pb::RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + + auto region_name = rs.value(); + if (action_result.has_exception()) { + if (action_result.exception().has_value()) { + auto exc = std::make_shared(action_result.exception().value()); + VLOG(8) << "Store Region Exception:- " << exc->what(); + multi_results->AddException(region_name, exc); + } + continue; + } + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); + } + + for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr result; + std::shared_ptr exc; + if (roe.has_exception()) { + if (roe.exception().has_value()) { + exc = std::make_shared(roe.exception().value()); + VLOG(8) << "Store ResultOrException:- " << exc->what(); + } + } else if (roe.has_result()) { + auto unique_result = ToResult(roe.result(), resp.cell_scanner()); + result = std::make_shared(*unique_result); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; + } else { + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector> empty_cells; + result = std::make_shared(empty_cells, multi_resp->processed() ? true : false, + false, false); + } + multi_results->Add(region_name, roe.index(), result, exc); + } + } + + if (multi_resp->has_regionstatistics()) { + hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_results->AddStatistic(stats.region(i).value(), + std::make_shared(stats.stat(i))); + } + } + return multi_results; + // return folly::makeFuture>(std::move(multi_results)); +} } /* namespace hbase */ diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 759b1ce..94835bb 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -19,13 +19,18 @@ #pragma once +#include #include #include +#include "connection/request.h" #include "connection/response.h" +#include "core/multi-response.h" #include "core/result.h" #include "if/Client.pb.h" #include "serde/cell-scanner.h" +using hbase::Request; +using hbase::Response; namespace hbase { /** @@ -47,6 +52,9 @@ class ResponseConverter { static std::vector> FromScanResponse(const Response& resp); + static std::unique_ptr GetResults(std::shared_ptr req, + const Response& resp); + private: // Constructor not required. We have all static methods to extract response from PB messages. ResponseConverter(); diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h new file mode 100644 index 0000000..b5b424a --- /dev/null +++ b/hbase-native-client/core/row.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#pragma once + +namespace hbase { + +class Row { + public: + Row() {} + explicit Row(const std::string &row) : row_(row) { CheckRow(row_); } + + /** + * @brief Returns the row for the Row interface. + */ + const std::string &row() const { return row_; } + virtual ~Row() {} + + private: + /** + * @brief Checks if the row for this Get operation is proper or not + * @param row Row to check + * @throws std::runtime_error if row is empty or greater than + * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) + */ + void CheckRow(const std::string &row) { + const int16_t kMaxRowLength = std::numeric_limits::max(); + int16_t row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error("Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } + } + + protected: + std::string row_ = ""; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h new file mode 100644 index 0000000..a7269ea --- /dev/null +++ b/hbase-native-client/core/server-request.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" +#include "core/region-request.h" + +using hbase::Action; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + using ActionsByRegion = std::map>; + + explicit ServerRequest(std::shared_ptr region_location) { + auto region_name = region_location->region_name(); + auto region_request = std::make_shared(region_location); + actions_by_region_[region_name] = region_request; + } + ~ServerRequest() {} + + void AddAction(std::shared_ptr region_location, std::shared_ptr action) { + auto region_name = region_location->region_name(); + auto itr = actions_by_region_.at(region_name); + itr->set_action(action); + } + + const ActionsByRegion &actions_by_region() const { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ -- 1.8.3.1