From 250fc5c73d555ca9d16eda7ebc37e8d13e467fa1 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Wed, 22 Mar 2017 21:15:02 +1100 Subject: [PATCH] BatchCaller Helper sources diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc index 189130e..80883cc 100644 --- a/hbase-native-client/connection/request.cc +++ b/hbase-native-client/connection/request.cc @@ -39,3 +39,7 @@ std::unique_ptr Request::scan() { return std::make_unique(std::make_shared(), std::make_shared(), "Scan"); } +std::unique_ptr Request::multi() { + return std::make_unique(std::make_shared(), + std::make_shared(), "Multi"); +} diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h index 91c684d..520b380 100644 --- a/hbase-native-client/connection/request.h +++ b/hbase-native-client/connection/request.h @@ -39,6 +39,8 @@ class Request { static std::unique_ptr mutate(); /** Create a request object for a scan */ static std::unique_ptr scan(); + /** Create a request object for a multi */ + static std::unique_ptr multi(); /** * This should be private. Do not use this. diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 2d77f2d..19b54d3 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -45,6 +45,12 @@ cxx_library( "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", "zk-util.h", + "action.h", + "multi-response.h", + "region-request.h", + "region-result.h", + "row.h", + "server-request.h", ], srcs=[ "async-connection.cc", @@ -62,6 +68,8 @@ cxx_library( "response-converter.cc", "table.cc", "zk-util.cc", + "multi-response.cc" + "region-result.cc", ], deps=[ "//exceptions:exceptions", diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h new file mode 100644 index 0000000..3511683 --- /dev/null +++ b/hbase-native-client/core/action.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include "core/row.h" + +using hbase::Row; +namespace hbase { + +class Action { + public: + Action(std::shared_ptr action, int original_index) + : action_(action), original_index_(original_index) {} + ~Action() {} + + int64_t original_index() const { return original_index_; } + + std::shared_ptr action() const { return action_; } + + private: + std::shared_ptr action_; + int64_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc index 07d0003..6ee2715 100644 --- a/hbase-native-client/core/get-test.cc +++ b/hbase-native-client/core/get-test.cc @@ -21,7 +21,8 @@ #include #include -using namespace hbase; + +using hbase::Get; const int NUMBER_OF_GETS = 5; void CheckFamilies(Get &get) { @@ -102,7 +103,7 @@ void CheckFamiliesAfterCopy(Get &get) { } void GetMethods(Get &get, const std::string &row) { - EXPECT_EQ(row, get.Row()); + EXPECT_EQ(row, get.row()); CheckFamilies(get); EXPECT_EQ(true, get.CacheBlocks()); diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc index 5c5f446..afeb429 100644 --- a/hbase-native-client/core/get.cc +++ b/hbase-native-client/core/get.cc @@ -26,7 +26,7 @@ namespace hbase { Get::~Get() {} -Get::Get(const std::string &row) : row_(row) { CheckRow(row_); } +Get::Get(const std::string &row) : Row(row) {} Get::Get(const Get &get) { row_ = get.row_; @@ -78,8 +78,6 @@ Get &Get::AddColumn(const std::string &family, const std::string &qualifier) { return *this; } -const std::string &Get::Row() const { return row_; } - hbase::pb::Consistency Get::Consistency() const { return consistency_; } Get &Get::SetConsistency(hbase::pb::Consistency consistency) { @@ -119,15 +117,4 @@ Get &Get::SetTimeStamp(int64_t timestamp) { const TimeRange &Get::Timerange() const { return *tr_; } -void Get::CheckRow(const std::string &row) { - const int kMaxRowLength = std::numeric_limits::max(); - int row_length = row.size(); - if (0 == row_length) { - throw std::runtime_error("Row length can't be 0"); - } - if (row_length > kMaxRowLength) { - throw std::runtime_error("Length of " + row + " is greater than max row size: " + - std::to_string(kMaxRowLength)); - } -} } // namespace hbase diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h index 5492f21..e0be4e7 100644 --- a/hbase-native-client/core/get.h +++ b/hbase-native-client/core/get.h @@ -25,9 +25,11 @@ #include #include #include "core/query.h" +#include "core/row.h" #include "core/time-range.h" #include "if/Client.pb.h" +using hbase::Row; namespace hbase { /** @@ -36,7 +38,7 @@ namespace hbase { */ using FamilyMap = std::map>; -class Get : public Query { +class Get : public Row, public Query { public: /** * Constructors @@ -110,11 +112,6 @@ class Get : public Query { Get& AddColumn(const std::string& family, const std::string& qualifier); /** - * @brief Returns the row for this Get operation - */ - const std::string& Row() const; - - /** * @brief Returns true if family map (FamilyMap) is non empty false otherwise */ bool HasFamilies() const; @@ -131,21 +128,12 @@ class Get : public Query { Get& SetConsistency(hbase::pb::Consistency consistency); private: - std::string row_ = ""; int32_t max_versions_ = 1; bool cache_blocks_ = true; bool check_existence_only_ = false; FamilyMap family_map_; hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; std::unique_ptr tr_ = std::make_unique(); - - /** - * @brief Checks if the row for this Get operation is proper or not - * @param row Row to check - * @throws std::runtime_error if row is empty or greater than - * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) - */ - void CheckRow(const std::string& row); }; } // namespace hbase diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc new file mode 100644 index 0000000..d24fb3e --- /dev/null +++ b/hbase-native-client/core/multi-response.cc @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/multi-response.h" +#include "core/region-result.h" + +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->size(); + } + return size; +} + +void MultiResponse::Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, std::shared_ptr exc) { + bool region_found = false; + for (auto itr = results_.begin(); itr != results_.end(); ++itr) { + if (itr->first == region_name) { + region_found = true; + itr->second->add_result(original_index, result, exc); + break; + } + } + if (!region_found) { + auto region_result = std::make_shared(); + region_result->add_result(original_index, result, exc); + results_[region_name] = region_result; + } +} + +void MultiResponse::AddException(const std::string& region_name, + std::shared_ptr exception) { + exceptions_[region_name] = exception; +} + +std::shared_ptr MultiResponse::Exception(const std::string& region_name) const { + auto find = exceptions_.at(region_name); + return find; +} + +const std::map >& MultiResponse::Exceptions() const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + std::shared_ptr stat) { + results_[region_name]->set_stat(stat); +} + +const std::map >& MultiResponse::Results() const { + return results_; +} + +MultiResponse::~MultiResponse() {} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h new file mode 100644 index 0000000..716cb5a --- /dev/null +++ b/hbase-native-client/core/multi-response.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class MultiResponse { + public: + MultiResponse(); + /** + * @brief Returns Number of pairs in this container + */ + int Size() const; + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param originalIndex the original index of the Action (request). + * @param resOrEx the result or error; will be empty for successful Put and Delete actions. + */ + void Add(const std::string& region_name, int32_t original_index, std::shared_ptr result, + std::shared_ptr exc); + + void AddException(const std::string& region_name, std::shared_ptr exception); + + /** + * @return the exception for the region, if any. Null otherwise. + */ + std::shared_ptr Exception(const std::string& region_name) const; + + const std::map>& Exceptions() const; + + void AddStatistic(const std::string& region_name, std::shared_ptr stat); + + const std::map>& Results() const; + + ~MultiResponse(); + + private: + // map of regionName to map of Results by the original index for that Result + std::map> results_; + /** + * The server can send us a failure for the region itself, instead of individual failure. + * It's a part of the protobuf definition. + */ + std::map> exceptions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h new file mode 100644 index 0000000..db0498c --- /dev/null +++ b/hbase-native-client/core/region-request.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" + +using hbase::Action; +namespace hbase { + +class RegionRequest { + public: + // Concurrent + using ActionList = std::vector>; + explicit RegionRequest(const std::shared_ptr ®ion_loc) + : region_loc_(region_loc) {} + ~RegionRequest() {} + void add_action(std::shared_ptr action) { + std::lock_guard lck(actions_lock_); + actions_.push_back(action); + } + std::shared_ptr region_location() const { return region_loc_; } + const ActionList &actions() const { return actions_; } + + private: + std::mutex actions_lock_; + std::shared_ptr region_loc_; + ActionList actions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc new file mode 100644 index 0000000..43e5398 --- /dev/null +++ b/hbase-native-client/core/region-result.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/region-result.h" +#include +#include + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +RegionResult::RegionResult() {} + +RegionResult::~RegionResult() {} + +void RegionResult::add_result(int32_t index, std::shared_ptr result, + std::shared_ptr exc) { + auto index_found = result_.find(index); + if (index_found == result_.end()) { + result_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); + } else { + throw std::runtime_error("Index " + std::to_string(index) + + " already set with ResultOrException"); + } +} + +void RegionResult::set_stat(std::shared_ptr stat) { stat_ = stat; } + +int RegionResult::size() const { return result_.size(); } + +std::shared_ptr RegionResult::result(int32_t index) const { + return std::make_shared(result_.at(index)); +} + +const std::shared_ptr& RegionResult::stat() const { return stat_; } + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h new file mode 100644 index 0000000..4f25651 --- /dev/null +++ b/hbase-native-client/core/region-result.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { +using ResultOrExceptionTuple = + std::tuple, std::shared_ptr>; +class RegionResult { + public: + RegionResult(); + void add_result(int32_t index, std::shared_ptr result, + std::shared_ptr exc); + + void set_stat(std::shared_ptr stat); + + int size() const; + + std::shared_ptr result(int32_t index) const; + + const std::shared_ptr& stat() const; + + ~RegionResult(); + + private: + std::map result_; + std::shared_ptr stat_; // = std::make_shared(); +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 227e04a..82432a4 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -43,35 +43,9 @@ void RequestConverter::SetRegion(const std::string ®ion_name, std::unique_ptr RequestConverter::ToGetRequest(const Get &get, const std::string ®ion_name) { auto pb_req = Request::get(); - auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_get = pb_msg->mutable_get(); - pb_get->set_max_versions(get.MaxVersions()); - pb_get->set_cache_blocks(get.CacheBlocks()); - pb_get->set_consistency(get.Consistency()); - - if (!get.Timerange().IsAllTime()) { - hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); - pb_time_range->set_from(get.Timerange().MinTimeStamp()); - pb_time_range->set_to(get.Timerange().MaxTimeStamp()); - } - pb_get->set_row(get.Row()); - if (get.HasFamilies()) { - for (const auto &family : get.Family()) { - auto column = pb_get->add_column(); - column->set_family(family.first); - for (const auto &qualifier : family.second) { - column->add_qualifier(qualifier); - } - } - } - - if (get.filter() != nullptr) { - pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); - } - + pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release()); return pb_req; } @@ -123,4 +97,65 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, return pb_req; } + +std::unique_ptr RequestConverter::ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions, + std::shared_ptr &multi_req) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + auto region_specifier = new hbase::pb::RegionSpecifier(); + RequestConverter::SetRegion(region_name, region_specifier); + auto pb_region_action = pb_msg->add_regionaction(); + pb_region_action->set_allocated_region(region_specifier); + int action_num = 0; + for (const auto ®ion_action : region_actions) { + auto pb_action = pb_region_action->add_action(); + auto action = region_action->action(); + if (auto pget = std::dynamic_pointer_cast(action)) { + auto pb_get = RequestConverter::ToGet(*pget.get()); + if (action_num % 3 == 0) + pb_action->set_allocated_get(nullptr); + else + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); + } + action_num += 1; + } + + auto unique = std::unique_ptr(new Request(*pb_req)); + VLOG(8) << "pb_req Addr:-" << pb_req.get() << "; unique Addr:-" << unique.get(); + multi_req = std::move(unique); + VLOG(8) << "multi_req Addr:-" << multi_req.get(); + + VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} + +std::unique_ptr RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + if (get.filter() != nullptr) { + pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); + } + return pb_get; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 57f08cc..22ffa7f 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -21,8 +21,11 @@ #include #include +#include #include "connection/request.h" +#include "core/action.h" #include "core/get.h" +#include "core/region-request.h" #include "core/scan.h" #include "if/HBase.pb.h" @@ -52,6 +55,9 @@ class RequestConverter { * @param region_name - table region */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions, + std::shared_ptr &multi_req); private: // Constructor not required. We have all static methods to create PB requests. @@ -64,6 +70,7 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); + static std::unique_ptr ToGet(const Get &get); }; } /* namespace hbase */ diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index b11856c..41048b0 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -19,13 +19,19 @@ #include "core/response-converter.h" +#include +#include +#include #include +#include #include - #include "core/cell.h" +#include "core/multi-response.h" +#include "exceptions/exception.h" using hbase::pb::GetResponse; using hbase::pb::ScanResponse; +using hbase::pb::RegionLoadStats; namespace hbase { @@ -54,10 +60,17 @@ std::shared_ptr ResponseConverter::ToResult( // iterate over the cells coming from rpc codec if (cell_scanner != nullptr) { - while (cell_scanner->Advance()) { + int cells_read = 0; + while (cells_read != result.associated_cell_count()) { + if (cell_scanner->Advance()) { vcells.push_back(cell_scanner->Current()); + cells_read += 1; + } else { + LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. Cells Read:- " + << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); + std::runtime_error("CellScanner::Advance() returned false unexpectedly"); + } } - // TODO: check associated cell count? } LOG(INFO) << "Returning Result"; return std::make_shared(vcells, result.exists(), result.stale(), result.partial()); @@ -98,4 +111,76 @@ std::vector> ResponseConverter::FromScanResponse(const R return results; } + +std::unique_ptr ResponseConverter::GetResults(std::shared_ptr req, + const Response& resp) { + auto multi_req = std::static_pointer_cast(req->req_msg()); + auto multi_resp = std::static_pointer_cast(resp.resp_msg()); + VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + auto multi_results = std::make_unique(); + for (int32_t num = 0; num < res_region_action_count; num++) { + hbase::pb::RegionAction actions = multi_req->regionaction(num); + hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num); + hbase::pb::RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + + auto region_name = rs.value(); + if (action_result.has_exception()) { + if (action_result.exception().has_value()) { + auto exc = std::make_shared(action_result.exception().value()); + VLOG(8) << "Store Region Exception:- " << exc->what(); + multi_results->AddException(region_name, exc); + } + continue; + } + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); + } + + for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr result; + std::shared_ptr exc; + if (roe.has_exception()) { + if (roe.exception().has_value()) { + exc = std::make_shared(roe.exception().value()); + VLOG(8) << "Store ResultOrException:- " << exc->what(); + } + } else if (roe.has_result()) { + result = ToResult(roe.result(), resp.cell_scanner()); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; + } else { + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector> empty_cells; + result = std::make_shared(empty_cells, multi_resp->processed() ? true : false, + false, false); + } + multi_results->Add(region_name, roe.index(), std::move(result), exc); + } + } + + if (multi_resp->has_regionstatistics()) { + hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_results->AddStatistic(stats.region(i).value(), + std::make_shared(stats.stat(i))); + } + } + return multi_results; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 743c14b..a5095fd 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -21,11 +21,15 @@ #include #include +#include "connection/request.h" #include "connection/response.h" +#include "core/multi-response.h" #include "core/result.h" #include "if/Client.pb.h" #include "serde/cell-scanner.h" +using hbase::Request; +using hbase::Response; namespace hbase { /** @@ -47,6 +51,9 @@ class ResponseConverter { static std::vector> FromScanResponse(const Response& resp); + static std::unique_ptr GetResults(std::shared_ptr req, + const Response& resp); + private: // Constructor not required. We have all static methods to extract response from PB messages. ResponseConverter(); diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h new file mode 100644 index 0000000..b5b424a --- /dev/null +++ b/hbase-native-client/core/row.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#pragma once + +namespace hbase { + +class Row { + public: + Row() {} + explicit Row(const std::string &row) : row_(row) { CheckRow(row_); } + + /** + * @brief Returns the row for the Row interface. + */ + const std::string &row() const { return row_; } + virtual ~Row() {} + + private: + /** + * @brief Checks if the row for this Get operation is proper or not + * @param row Row to check + * @throws std::runtime_error if row is empty or greater than + * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) + */ + void CheckRow(const std::string &row) { + const int16_t kMaxRowLength = std::numeric_limits::max(); + int16_t row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error("Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } + } + + protected: + std::string row_ = ""; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h new file mode 100644 index 0000000..1482f13 --- /dev/null +++ b/hbase-native-client/core/server-request.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" +#include "core/region-request.h" + +using hbase::Action; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + // Concurrent + using ActionsByRegion = std::map>; + + explicit ServerRequest(std::shared_ptr region_location) { + auto region_name = region_location->region_name(); + auto region_request = std::make_shared(region_location); + std::lock_guard lck(actions_by_region_lock_); + actions_by_region_[region_name] = region_request; + } + ~ServerRequest() {} + + void AddAction(std::shared_ptr region_location, std::shared_ptr action) { + auto region_name = region_location->region_name(); + std::lock_guard lck(actions_by_region_lock_); + auto itr = actions_by_region_.at(region_name); + itr->add_action(action); + } + + const ActionsByRegion &actions_by_region() const { return actions_by_region_; } + + private: + std::mutex actions_by_region_lock_; + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ -- 1.8.3.1