From 9b4412d1521422b5345de03ecbd8dc253a439d12 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Fri, 10 Mar 2017 23:04:56 +1100 Subject: [PATCH] Classes required for BatchCallerBuilder diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc index 189130e..80883cc 100644 --- a/hbase-native-client/connection/request.cc +++ b/hbase-native-client/connection/request.cc @@ -39,3 +39,7 @@ std::unique_ptr Request::scan() { return std::make_unique(std::make_shared(), std::make_shared(), "Scan"); } +std::unique_ptr Request::multi() { + return std::make_unique(std::make_shared(), + std::make_shared(), "Multi"); +} diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h index 91c684d..520b380 100644 --- a/hbase-native-client/connection/request.h +++ b/hbase-native-client/connection/request.h @@ -39,6 +39,8 @@ class Request { static std::unique_ptr mutate(); /** Create a request object for a scan */ static std::unique_ptr scan(); + /** Create a request object for a multi */ + static std::unique_ptr multi(); /** * This should be private. Do not use this. diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index fd4c0dc..ad50174 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -33,6 +33,8 @@ cxx_library( "meta-utils.h", "get.h", "time-range.h", + "configuration.h", + "hbase-configuration-loader.h", "scan.h", "result.h", "request-converter.h", @@ -41,6 +43,13 @@ cxx_library( "async-rpc-retrying-caller-factory.h", "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", + "action.h", + "multi-action.h", + "multi-response.h", + "region-request.h", + "region-result.h", + "row.h", + "server-request.h", ], srcs=[ "cell.cc", @@ -50,17 +59,21 @@ cxx_library( "meta-utils.cc", "get.cc", "time-range.cc", + "configuration.cc", + "hbase-configuration-loader.cc", "scan.cc", "result.cc", "request-converter.cc", "response-converter.cc", "table.cc", + "multi-action.cc", + "multi-response.cc", + "region-result.cc", ], deps=[ "//exceptions:exceptions", "//utils:utils", "//connection:connection", - "//core:conf", "//if:if", "//serde:serde", "//third-party:folly", @@ -68,29 +81,10 @@ cxx_library( "//third-party:zookeeper_mt", ], compiler_flags=['-Weffc++', '-ggdb'], - visibility=[ - 'PUBLIC', - ],) -cxx_library( - name="conf", - exported_headers=[ - "configuration.h", - "hbase-configuration-loader.h", - ], - srcs=[ - "configuration.cc", - "hbase-configuration-loader.cc", - ], - deps=["//third-party:folly"], - compiler_flags=['-Weffc++', '-ggdb'], - visibility=[ - 'PUBLIC', - ],) + visibility=['PUBLIC',],) cxx_test( name="location-cache-test", - srcs=[ - "location-cache-test.cc", - ], + srcs=["location-cache-test.cc",], deps=[ ":core", "//test-util:test-util", @@ -98,18 +92,12 @@ cxx_test( run_test_separately=True,) cxx_test( name="cell-test", - srcs=[ - "cell-test.cc", - ], - deps=[ - ":core", - ], + srcs=["cell-test.cc",], + deps=[":core",], run_test_separately=True,) cxx_test( name="filter-test", - srcs=[ - "filter-test.cc", - ], + srcs=["filter-test.cc",], deps=[ ":core", "//if:if", @@ -167,9 +155,7 @@ cxx_test( run_test_separately=True,) cxx_test( name="client-test", - srcs=[ - "client-test.cc", - ], + srcs=["client-test.cc",], deps=[ ":core", "//if:if", @@ -179,7 +165,5 @@ cxx_test( run_test_separately=True,) cxx_binary( name="simple-client", - srcs=[ - "simple-client.cc", - ], + srcs=["simple-client.cc",], deps=[":core", "//connection:connection"],) diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h new file mode 100644 index 0000000..1a8f127 --- /dev/null +++ b/hbase-native-client/core/action.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include "core/row.h" + +using hbase::Row; +namespace hbase { + +class Action { + public: + Action(std::shared_ptr action, int original_index) + : action_(action), original_index_(original_index) {} + ~Action() {} + + int32_t OriginalIndex() { return original_index_; } + + const std::shared_ptr &GetAction() { return action_; } + + private: + std::shared_ptr action_; + int32_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc index 07d0003..6ee2715 100644 --- a/hbase-native-client/core/get-test.cc +++ b/hbase-native-client/core/get-test.cc @@ -21,7 +21,8 @@ #include #include -using namespace hbase; + +using hbase::Get; const int NUMBER_OF_GETS = 5; void CheckFamilies(Get &get) { @@ -102,7 +103,7 @@ void CheckFamiliesAfterCopy(Get &get) { } void GetMethods(Get &get, const std::string &row) { - EXPECT_EQ(row, get.Row()); + EXPECT_EQ(row, get.row()); CheckFamilies(get); EXPECT_EQ(true, get.CacheBlocks()); diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc index 5c5f446..afeb429 100644 --- a/hbase-native-client/core/get.cc +++ b/hbase-native-client/core/get.cc @@ -26,7 +26,7 @@ namespace hbase { Get::~Get() {} -Get::Get(const std::string &row) : row_(row) { CheckRow(row_); } +Get::Get(const std::string &row) : Row(row) {} Get::Get(const Get &get) { row_ = get.row_; @@ -78,8 +78,6 @@ Get &Get::AddColumn(const std::string &family, const std::string &qualifier) { return *this; } -const std::string &Get::Row() const { return row_; } - hbase::pb::Consistency Get::Consistency() const { return consistency_; } Get &Get::SetConsistency(hbase::pb::Consistency consistency) { @@ -119,15 +117,4 @@ Get &Get::SetTimeStamp(int64_t timestamp) { const TimeRange &Get::Timerange() const { return *tr_; } -void Get::CheckRow(const std::string &row) { - const int kMaxRowLength = std::numeric_limits::max(); - int row_length = row.size(); - if (0 == row_length) { - throw std::runtime_error("Row length can't be 0"); - } - if (row_length > kMaxRowLength) { - throw std::runtime_error("Length of " + row + " is greater than max row size: " + - std::to_string(kMaxRowLength)); - } -} } // namespace hbase diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h index 5492f21..e0be4e7 100644 --- a/hbase-native-client/core/get.h +++ b/hbase-native-client/core/get.h @@ -25,9 +25,11 @@ #include #include #include "core/query.h" +#include "core/row.h" #include "core/time-range.h" #include "if/Client.pb.h" +using hbase::Row; namespace hbase { /** @@ -36,7 +38,7 @@ namespace hbase { */ using FamilyMap = std::map>; -class Get : public Query { +class Get : public Row, public Query { public: /** * Constructors @@ -110,11 +112,6 @@ class Get : public Query { Get& AddColumn(const std::string& family, const std::string& qualifier); /** - * @brief Returns the row for this Get operation - */ - const std::string& Row() const; - - /** * @brief Returns true if family map (FamilyMap) is non empty false otherwise */ bool HasFamilies() const; @@ -131,21 +128,12 @@ class Get : public Query { Get& SetConsistency(hbase::pb::Consistency consistency); private: - std::string row_ = ""; int32_t max_versions_ = 1; bool cache_blocks_ = true; bool check_existence_only_ = false; FamilyMap family_map_; hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; std::unique_ptr tr_ = std::make_unique(); - - /** - * @brief Checks if the row for this Get operation is proper or not - * @param row Row to check - * @throws std::runtime_error if row is empty or greater than - * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) - */ - void CheckRow(const std::string& row); }; } // namespace hbase diff --git a/hbase-native-client/core/multi-action.cc b/hbase-native-client/core/multi-action.cc new file mode 100644 index 0000000..a66da22 --- /dev/null +++ b/hbase-native-client/core/multi-action.cc @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "core/multi-action.h" + +namespace hbase { + +MultiAction::MultiAction() {} + +MultiAction::~MultiAction() {} + +void MultiAction::Add(const std::string& region_name, std::shared_ptr get) { + actions_[region_name].push_back(get); +} + +void MultiAction::Add(const std::string& region_name, + const std::vector >& gets) { + actions_[region_name] = gets; +} + +int MultiAction::Size() const { + int size = 0; + for (const auto& action : actions_) { + size += action.second.size(); + } + return size; +} + +// TODO Will remove in subsequent patch if not reqd +std::vector MultiAction::Regions() const { + std::vector regions; + /* + std::transform(std::begin(gets_), std::end(gets_), std::back_inserter(regions), + [](auto &get_pair) { + return get_pair.first; + }); + */ + return regions; +} +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi-action.h b/hbase-native-client/core/multi-action.h new file mode 100644 index 0000000..2d211f0 --- /dev/null +++ b/hbase-native-client/core/multi-action.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "core/get.h" + +using hbase::Get; + +namespace hbase { +using MultiActions = std::map>>; +class MultiAction { + private: + MultiActions actions_; + + public: + MultiAction(); + ~MultiAction(); + + void Add(const std::string ®ion_name, std::shared_ptr get); + void Add(const std::string ®ion_name, const std::vector> &gets); + + bool IsEmpty() const { return actions_.empty(); } + int Size() const; + std::vector Regions() const; + const MultiActions &Actions() const { return actions_; } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc new file mode 100644 index 0000000..ea38fc8 --- /dev/null +++ b/hbase-native-client/core/multi-response.cc @@ -0,0 +1,78 @@ +/* + l * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/multi-response.h" + +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->Size(); + } + return size; +} + +void MultiResponse::Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, std::shared_ptr exc) { + bool region_found = false; + for (auto itr = results_.begin(); itr != results_.end(); ++itr) { + if (itr->first == region_name) { + region_found = true; + itr->second->AddResult(original_index, result, exc); + break; + } + } + if (!region_found) { + auto region_result = std::make_shared(); + region_result->AddResult(original_index, result, exc); + results_[region_name] = region_result; + } +} + +void MultiResponse::AddException(const std::string& region_name, + std::shared_ptr exception) { + exceptions_[region_name] = exception; +} + +std::shared_ptr MultiResponse::Exception(const std::string& region_name) const { + auto find = exceptions_.at(region_name); + return find; +} + +const std::map >& MultiResponse::Exceptions() const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + const std::shared_ptr& stat) { + results_[region_name]->SetStat(stat); +} + +const std::map >& MultiResponse::Results() const { + return results_; +} + +MultiResponse::ResponseType MultiResponse::Type() const { return ResponseType::MULTI; } + +MultiResponse::~MultiResponse() {} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h new file mode 100644 index 0000000..d665caf --- /dev/null +++ b/hbase-native-client/core/multi-response.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class MultiResponse { + public: + enum class ResponseType { + SINGLE = 0, + MULTI = 1, + }; + MultiResponse(); + explicit MultiResponse(const std::string& region_name); + /** + * @brief Returns Number of pairs in this container + */ + int Size() const; + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param originalIndex the original index of the Action (request). + * @param resOrEx the result or error; will be empty for successful Put and Delete actions. + */ + void Add(const std::string& region_name, int32_t original_index, std::shared_ptr result, + std::shared_ptr exc); + + void AddException(const std::string& region_name, std::shared_ptr exception); + + /** + * @return the exception for the region, if any. Null otherwise. + */ + std::shared_ptr Exception(const std::string& region_name) const; + + const std::map>& Exceptions() const; + + void AddStatistic(const std::string& region_name, const std::shared_ptr& stat); + + const std::map>& Results() const; + + MultiResponse::ResponseType Type() const; + + virtual ~MultiResponse(); + + private: + // map of regionName to map of Results by the original index for that Result + std::map> results_; + /** + * The server can send us a failure for the region itself, instead of individual failure. + * It's a part of the protobuf definition. + */ + std::map> exceptions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h new file mode 100644 index 0000000..15e1139 --- /dev/null +++ b/hbase-native-client/core/region-request.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include +#include +#include +#include "core/action.h" +#include "core/region-location.h" + +using hbase::Action; +namespace hbase { +// TODO Revisit this class once +class RegionRequest { + public: + using ActionList = std::shared_ptr>>; + explicit RegionRequest(const std::shared_ptr ®ion_loc) + : region_loc_(region_loc) {} + ~RegionRequest() {} + void AddAction(std::shared_ptr action) { actions_->push_back(action); } + std::shared_ptr RegionLocation() { return region_loc_; } + const ActionList &Actions() { return actions_; } + + private: + std::shared_ptr region_loc_; + ActionList actions_ = std::make_shared>>(); +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc new file mode 100644 index 0000000..ef62ff8 --- /dev/null +++ b/hbase-native-client/core/region-result.cc @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/region-result.h" +#include +#include + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +RegionResult::RegionResult() {} + +RegionResult::~RegionResult() {} + +void RegionResult::AddResult(int32_t index, std::shared_ptr result, + std::shared_ptr exc) { + auto index_found = result_.find(index); + if (index_found == result_.end()) { + result_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); + } else { + throw std::runtime_error("Index " + std::to_string(index) + + " already set with ResultOrException"); + } +} + +void RegionResult::SetStat(std::shared_ptr stat) { stat_ = stat; } + +int RegionResult::Size() const { return result_.size(); } + +std::shared_ptr RegionResult::Result(int32_t index) const { + // auto ret = result_.at(index); + // auto shared_tuple = std::make_shared(result_.at(index)); + return std::make_shared(result_.at(index)); +} + +const std::shared_ptr& RegionResult::Stat() const { return stat_; } + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h new file mode 100644 index 0000000..a006dad --- /dev/null +++ b/hbase-native-client/core/region-result.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { +using ResultOrExceptionTuple = + std::tuple, std::shared_ptr>; +class RegionResult { + public: + RegionResult(); + void AddResult(int32_t index, std::shared_ptr result, + std::shared_ptr exc); + + void SetStat(std::shared_ptr stat); + + int Size() const; + + std::shared_ptr Result(int32_t index) const; + + const std::shared_ptr& Stat() const; + + ~RegionResult(); + + private: + std::map result_; + std::shared_ptr stat_ = std::make_shared(); +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 227e04a..43d8f12 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -18,6 +18,7 @@ */ #include "core/request-converter.h" +#include #include #include "if/Client.pb.h" @@ -57,7 +58,7 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, pb_time_range->set_from(get.Timerange().MinTimeStamp()); pb_time_range->set_to(get.Timerange().MaxTimeStamp()); } - pb_get->set_row(get.Row()); + pb_get->set_row(get.row()); if (get.HasFamilies()) { for (const auto &family : get.Family()) { auto column = pb_get->add_column(); @@ -123,4 +124,51 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, return pb_req; } + +folly::Future> RequestConverter::ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + auto region_specifier = new hbase::pb::RegionSpecifier(); + RequestConverter::SetRegion(region_name, region_specifier); + auto pb_region_action = pb_msg->add_regionaction(); + pb_region_action->set_allocated_region(region_specifier); + int action_num = 0; + for (const auto ®ion_action : region_actions) { + auto pb_action = pb_region_action->add_action(); + auto action = region_action->GetAction(); + if (auto pget = std::dynamic_pointer_cast(action)) { + auto pb_get = RequestConverter::ToGet(*pget); + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); + } + action_num += 1; + } + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return folly::makeFuture>(std::move(pb_req)); +} + +std::unique_ptr RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + return pb_get; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 57f08cc..161cbf9 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -21,12 +21,21 @@ #include #include +#include #include "connection/request.h" #include "core/get.h" +#include "core/multi-action.h" +#include "core/region-request.h" #include "core/scan.h" #include "if/HBase.pb.h" +using hbase::Get; +using hbase::MultiAction; +using hbase::RegionRequest; +using hbase::Request; +using hbase::Scan; using hbase::pb::RegionSpecifier; +using hbase::pb::ServerName; namespace hbase { /** @@ -53,6 +62,9 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static folly::Future> ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +76,8 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); + + static std::unique_ptr ToGet(const Get &get); }; } /* namespace hbase */ diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index 7bb5e5d..0d1aa27 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -19,6 +19,7 @@ #include "core/response-converter.h" +#include #include #include @@ -51,10 +52,12 @@ std::unique_ptr ResponseConverter::ToResult( // iterate over the cells coming from rpc codec if (cell_scanner != nullptr) { - while (cell_scanner->Advance()) { + int num_cells = 0; + while (num_cells != result.associated_cell_count()) { + cell_scanner->Advance(); vcells.push_back(cell_scanner->Current()); + num_cells += 1; } - // TODO: check associated cell count? } return std::make_unique(vcells, result.exists(), result.stale(), result.partial()); } @@ -94,4 +97,83 @@ std::vector> ResponseConverter::FromScanResponse(const R return results; } + +std::shared_ptr ResponseConverter::GetResults(Request* req, + const Response& resp) { + auto multi_req = std::static_pointer_cast(req->req_msg()); + auto multi_resp = std::static_pointer_cast(resp.resp_msg()); + VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + auto multi_results = std::make_shared(); + for (int32_t num = 0; num < res_region_action_count; num++) { + hbase::pb::RegionAction actions = multi_req->regionaction(num); + hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num); + hbase::pb::RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + auto region_name = rs.value(); + if (action_result.has_exception()) { + std::shared_ptr exc; + std::shared_ptr rexc; + ([action_result, exc, rexc]() { + hbase::pb::NameBytesPair nb_pair = action_result.exception(); + if (nb_pair.has_value()) { + // rexc = std::make_shared(nb_pair.name() + " " + nb_pair.value()); + // exc = std::move(rexc); + } + }); + multi_results->AddException(region_name, exc); + continue; + } + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); + } + + for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr result; + std::shared_ptr exc; + if (roe.has_exception()) { + std::shared_ptr rexc; + hbase::pb::NameBytesPair nb_pair = action_result.exception(); + if (nb_pair.has_value()) { + // rexc = std::make_shared(nb_pair.name() + " " + nb_pair.value()); + // exc = std::move(rexc); + } + } else if (roe.has_result()) { + auto unique_result = ToResult(roe.result(), resp.cell_scanner()); + result = std::make_shared(*unique_result); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; + } else { + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector> empty_cells; + result = std::make_shared(empty_cells, multi_resp->processed() ? true : false, + false, false); + } + multi_results->Add(region_name, roe.index(), result, exc); + } + } + + if (multi_resp->has_regionstatistics()) { + hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_results->AddStatistic(stats.region(i).value(), + std::make_shared(stats.stat(i))); + } + } + return multi_results; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 759b1ce..395b7d1 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -21,11 +21,15 @@ #include #include +#include "connection/request.h" #include "connection/response.h" +#include "core/multi-response.h" #include "core/result.h" #include "if/Client.pb.h" #include "serde/cell-scanner.h" +using hbase::Request; +using hbase::Response; namespace hbase { /** @@ -47,6 +51,10 @@ class ResponseConverter { static std::vector> FromScanResponse(const Response& resp); + /*static std::shared_ptr GetResults(const std::shared_ptr& req, + std::unique_ptr resp);*/ + static std::shared_ptr GetResults(Request* req, const Response& resp); + private: // Constructor not required. We have all static methods to extract response from PB messages. ResponseConverter(); diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h new file mode 100644 index 0000000..fa6b4a0 --- /dev/null +++ b/hbase-native-client/core/row.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#pragma once + +namespace hbase { + +class Row { + public: + Row() {} + explicit Row(const std::string &row) + : row_(row) { + CheckRow(row_); + } + + /** + * @brief Returns the row for the Row interface. + */ + const std::string &row() const { return row_; } + virtual ~Row() {} + + private: + /** + * @brief Checks if the row for this Get operation is proper or not + * @param row Row to check + * @throws std::runtime_error if row is empty or greater than + * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) + */ + void CheckRow(const std::string &row) { + const int16_t kMaxRowLength = std::numeric_limits::max(); + int16_t row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error("Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } + } + + protected: + std::string row_ = ""; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h new file mode 100644 index 0000000..e1036eb --- /dev/null +++ b/hbase-native-client/core/server-request.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "core/action.h" +#include "core/region-request.h" +#include "core/region-location.h" + +using hbase::Action; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + using ActionsByRegion = std::map>; + + explicit ServerRequest(std::shared_ptr region_location) { + auto region_name = region_location->region_name(); + auto region_request = std::make_shared(region_location); + actions_by_region_[region_name] = region_request; + } + ~ServerRequest() {} + + void AddAction(std::shared_ptr region_location, std::shared_ptr action) { + auto region_name = region_location->region_name(); + auto itr = actions_by_region_.at(region_name); + itr->AddAction(action); + } + + const ActionsByRegion &Actions() { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 2ce8fcd..924216f 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -52,7 +52,7 @@ Table::Table(const TableName &table_name, Table::~Table() {} std::unique_ptr Table::Get(const hbase::Get &get) { - auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000)); + auto loc = location_cache_->LocateFromMeta(*table_name_, get.row()).get(milliseconds(1000)); auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name()); auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil -- 1.8.3.1