From 53be3b227302339c8085fc4cee493438e38bc325 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Thu, 9 Feb 2017 09:56:29 +1100 Subject: [PATCH] Patch for MultiRequests diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h new file mode 100644 index 0000000..8afe94c --- /dev/null +++ b/hbase-native-client/core/action.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include "core/row.h" + +using hbase::Row; +namespace hbase { + +class Action { + public: + Action(std::shared_ptr action, int original_index) + : action_(action), original_index_(original_index_) {} + ~Action() {} + + int32_t OriginalIndex() { return original_index_; } + + std::shared_ptr GetAction() { return action_; } + + private: + std::shared_ptr action_; + int32_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h new file mode 100644 index 0000000..6b27c05 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/async-rpc-retrying-batch-caller.h" +#include +#include + +#include "core/row.h" +#include "core/async-connection.h" +#include "core/get.h" +#include "core/hregion-location.h" +#include "if/HBase.pb.h" + +using hbase::Get; +using hbase::pb::TableName; + +namespace hbase { + +template +class BatchCallerBuilder; + +template +class AsyncRpcRetryingBatchCallerFactory { + public: + template + friend class BatchCallerBuilder; + AsyncRpcRetryingBatchCallerFactory(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~AsyncRpcRetryingBatchCallerFactory() = default; + + template + std::shared_ptr> Batch() { + return std::make_shared>(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + + // BatchCallerBuilder + public: + template + class BatchCallerBuilder + : public std::enable_shared_from_this> { + public: + BatchCallerBuilder(std::shared_ptr> async_rpc_batch) + : async_rpc_batch_(async_rpc_batch), + table_name_(nullptr), + actions_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0) { + LOG(INFO) << "BatchCallerBuilder Starts and Ends"; + } + + virtual ~BatchCallerBuilder() = default; + + typedef BatchCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + GenenericThisType& SetTable(std::shared_ptr table_name) { + LOG(INFO) << "Table"; + table_name_ = table_name; + return *this; + } + + GenenericThisType& SetActions(std::shared_ptr>> actions) { + LOG(INFO) << "Actions"; + actions_ = actions; + return *this; // shared_this(); + } + + GenenericThisType& SetOperationTimeout(long operation_timeout_nanos) { + LOG(INFO) << "Op TimeOut"; + operation_timeout_nanos_ = operation_timeout_nanos; + return *this; // shared_this(); + } + + GenenericThisType& SetRpcTimeout(long rpc_timeout_nanos) { + LOG(INFO) << "Rpc Timeout"; + rpc_timeout_nanos_ = rpc_timeout_nanos; + return *this; // shared_this(); + } + + GenenericThisType& SetPause(int64_t pause_ns) { + LOG(INFO) << "Pause"; + pause_ns_ = pause_ns; + return *this; // shared_this(); + } + + GenenericThisType& SetMaxAttempts(int32_t max_attempts) { + LOG(INFO) << "MaxAttempts"; + max_attempts_ = max_attempts; + return *this; // shared_this(); + } + + GenenericThisType& StartLogErrorsCnt(int32_t start_log_errors_cnt) { + LOG(INFO) << "LogErrorsCnt"; + start_log_errors_cnt_ = start_log_errors_cnt; + return *this; // shared_this(); + } + + std::shared_ptr> Build() { + LOG(INFO) << "Is this BatchCallerBuilder Build() ??"; + return std::make_shared>( + async_rpc_batch_->retry_timer_, async_rpc_batch_->conn_, table_name_, actions_, 1000, 1, + operation_timeout_nanos_, rpc_timeout_nanos_, 1); + } + + const std::vector>>& Call() { + AsyncRpcRetryingBatchCaller async_rpc_retrying_batch_caller( + async_rpc_batch_->retry_timer_, async_rpc_batch_->conn_, table_name_, actions_, pause_ns_, + max_attempts_, operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_cnt_); + return async_rpc_retrying_batch_caller.Call(); + } + + public: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr> async_rpc_batch_; + std::shared_ptr table_name_; + std::shared_ptr location_cache_; + std::shared_ptr>> actions_; + int64_t operation_timeout_nanos_; + int64_t rpc_timeout_nanos_; + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int32_t start_log_errors_cnt_ = 0; + // Callable callable_; + }; +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-batch-caller.h b/hbase-native-client/core/async-rpc-retrying-batch-caller.h new file mode 100644 index 0000000..01cd930 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-batch-caller.h @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "core/action.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/get.h" +#include "core/hbase-rpc-controller.h" +#include "core/hregion-location.h" +#include "core/location-cache.h" +#include "core/region_request.h" +#include "core/request_converter.h" +#include "core/result.h" +#include "core/row.h" +#include "core/server_request.h" +#include "connection/rpc-client.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +#include + +using hbase::Action; +using hbase::Get; +using hbase::LocationCache; +using hbase::MultiAction; +using hbase::RequestConverter; +using hbase::Result; +using hbase::RpcClient; +using hbase::Get; +using hbase::ServerRequest; +using hbase::pb::ServerName; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +using ActionsByServer = std::map, std::shared_ptr>; +// TODO This map should be same as ServerRequest::ActionsByRegion +using ActionsByRegion = ServerRequest::ActionsByRegion; + +template +using BatchCallable = + std::function(std::shared_ptr, + std::shared_ptr, std::shared_ptr)>; +template +class AsyncRpcRetryingBatchCaller { + public: + AsyncRpcRetryingBatchCaller(std::shared_ptr retry_timer, + std::shared_ptr conn, std::shared_ptr table_name, + std::shared_ptr>> actions, + int64_t pause_ns, int32_t max_attempts, + int64_t operation_timeout_nanos, int64_t rpc_timeout_nanos, + int32_t start_log_errors_count) + : retry_timer_(retry_timer), + conn_(conn), + table_name_(table_name), + pause_ns_(pause_ns), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count) { + max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); + uint32_t index = 0; + for (auto row : *actions) { + std::shared_ptr action = std::make_shared(row, index); + actions_->push_back(action); + index += 1; + } + promises_.reserve(actions_->size()); + futures_.reserve(actions_->size()); + } + + virtual ~AsyncRpcRetryingBatchCaller() = default; + + const std::vector>>& Call() { + GroupAndSend(*actions_.get(), 1); + for (const auto &promise : promises_) { + futures_.push_back(std::make_shared>(promise->getFuture())); + } + return futures_; + } + + private: + long ElapsedMs() { + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns_)).count(); + } + + void CompleteExceptionally() { + // TODO Commented for now + // this->promise_->setException(RetriesExhaustedException(max_attempts_ - 1, exceptions_)); + } + + int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + int64_t RemainingTimeNs() { return operation_timeout_nanos_ - (GetNowNanos() - start_ns_); } + + std::shared_ptr> RemoveErrors(Action &action) { + // TODO Remove the action + return nullptr; + } + + // TODO Java has Supplier for RegionRequest + void LogException(int tries, std::vector> ®ion_requests, + std::shared_ptr &error, + std::shared_ptr server_name) { + if (tries > start_log_errors_count_) { + std::string regions; + std::for_each( + std::begin(region_requests), std::end(region_requests), + [®ions]( + const std::vector>::value_type ®ion_request) { + regions += region_request->HRegionLocation()->region_info()->GetPrintName() + ", "; + }); + LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" + << table_name_->qualifier() << " from " << server_name->host_name() + << " failed, tries=" << tries << error->what(); + } + } + + std::string GetExtraContextForError(std::shared_ptr server_name) { + return server_name->host_name(); + } + + void AddError(const std::shared_ptr &action, std::shared_ptr error, + std::shared_ptr server_name) { + std::vector action2errors; + ThrowableWithExtraContext extra_error(error, GetNowNanos(), ""); + action2errors.push_back(extra_error); + } + + void AddError(const std::vector> &actions, + std::shared_ptr error, std::shared_ptr server_name) { + for (const auto action : actions) { + AddError(action, error, server_name); + } + } + + void FailOne(const std::shared_ptr &action, int tries, + std::shared_ptr error, int64_t current_time, std::string &extras) { + // TODO auto itr = action2errors.find(123); action2errors.erase(itr); + std::vector> errors; + errors.push_back(std::make_shared(error, current_time, extras)); + // TODO + // Call CompleteExceptionally + } + + void FailAll(const std::vector> &actions, int tries, + std::shared_ptr error, std::shared_ptr server_name) { + int64_t current_time = 0L; + std::string extras = GetExtraContextForError(server_name); + for (const auto action : actions) { + FailOne(action, tries, error, current_time, extras); + } + } + + void FailAll(const std::vector> &actions, int tries) { + for (const auto action : actions) { + // TODO + } + } + + void OnError(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr exc, std::shared_ptr server_name) { + std::vector> copied_actions; + std::vector> region_requests; + region_requests.reserve(actions_by_region.size()); + std::for_each( + std::begin(actions_by_region), std::end(actions_by_region), + [®ion_requests, &copied_actions](const ActionsByRegion::value_type &action_by_region) { + region_requests.push_back(action_by_region.second); + for (const auto &action : *action_by_region.second->Actions().get()) { + copied_actions.push_back(action); + } + }); + // TODO Java API passes Throwable error to logException + // Throwable error = translateException(t); + LogException(tries, region_requests, exc, server_name); + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + FailAll(copied_actions, tries, exc, server_name); + } + TryResubmit(copied_actions, tries); + } + + void GroupAndSend(std::vector> &actions, int32_t tries) { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + FailAll(*actions_.get(), tries); + return; + } + } else { + locate_timeout_ns = -1L; + } + + ActionsByServer actions_by_server; + std::vector> locate_failed; + + for (const auto action : actions) { + try { + /* + * loc should be coming from conn_->get_locator()->GetRegionLocation(*table_name_, row, + * hbase::RegionLocateType::kCurrent, locate_timeout_ns); + * Ref HBASE-17465:- core/async-rpc-retrying-caller.h + */ + // TODO loc = conn_->get_locator()....; + std::shared_ptr loc; + bool found = false; + for (auto itr = actions_by_server.begin(); itr != actions_by_server.end(); ++itr) { + if (loc->server_name().host_name() == itr->first->host_name()) { + found = true; + itr->second->AddAction(loc, action); + break; + } + } + + if (!found) { + // Create new key + actions_by_server[std::make_shared(loc->server_name())] + ->AddAction(loc, action); + } + } + catch (const std::exception &exc) { + auto pexc = std::make_shared(exc); + if (auto error = std::dynamic_pointer_cast(pexc)) { + int64_t current_time = 0; + std::string extra = ""; + FailOne(action, tries, nullptr, current_time, extra); + return; + } + AddError(action, pexc, nullptr); + locate_failed.push_back(action); + } + } + if (!actions_by_server.empty()) { + Send(actions_by_server, tries); + } + + if (!locate_failed.empty()) { + TryResubmit(locate_failed, tries); + } + return; + } + + void Send(const ActionsByServer &actions_by_server, int32_t tries) { + int64_t remaining_ns; + if (operation_timeout_nanos_ > 0) { + remaining_ns = RemainingTimeNs(); + if (remaining_ns <= 0) { + // TODO + std::vector> failed_actions; + std::for_each(std::begin(actions_by_server), std::end(actions_by_server), + [&failed_actions](const ActionsByServer::value_type &action_by_server) { + for (const auto &value : action_by_server.second->Actions()) { + for (const auto &failed_action : *value.second->Actions().get()) { + failed_actions.push_back(failed_action); + } + } + }); + FailAll(failed_actions, tries); + return; + } + } else { + remaining_ns = std::numeric_limits::max(); + } + + for (auto action : actions_by_server) { + for (auto itr = action.second->Actions().begin(); itr != action.second->Actions().end(); + ++itr) { + auto loc = itr->first; + auto region_name = itr->second->HRegionLocation()->region_info()->RegionName(); + auto actions_by_region = itr->second->Actions(); + + /* + * This should come from try { conn_->GetRegionServerStub(loc->server_name()); ... } + * catch(...){...} + * Ref HBASE-17465:- core/async-rpc-retrying-caller.h + */ + std::shared_ptr stub; + auto pb_req = RequestConverter::ToMultiGetRequest(region_name, *actions_by_region.get()); + ConnectionUtils::ResetController(controller_, remaining_ns); + hbase::pb::MultiResponse pb_resp; + /* + * TODO wip + * Ref:- core/async-rpc-retrying-caller.h + */ + callable_(controller_, std::make_shared(loc), stub) + .then([this, action, tries](const RESP &resp) { + auto promise = std::make_shared>(); + promise->setValue(resp); + promises_.push_back(promise); + try { + // TODO + // This method will covert to pb_resp to MultiResponse Results + // auto multi_results = ResponseConverter::GetResults(pb_req, pb_resp); + + // TODO + // here we will check if any failed responses are there or not and call + // TryResumbit(); + // OnComplete(actions_by_region, tries, action.first, multi_results); + } + catch (const std::exception &exc) { + OnError(action.second->Actions(), tries, std::make_shared(exc), + action.first); + return; + } + }) + .onError([&, this](const std::exception &exc) { + // TODO check controller.getFailed() else call below OnError + OnError(action.second->Actions(), tries, std::make_shared(exc), + action.first); + }); + } + } + return; + } + + void TryResubmit(std::vector> actions, int tries) { + long delay_nanos; + if (operation_timeout_nanos_ > 0) { + long max_delay_nanos = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_nanos <= 0) { + FailAll(actions, tries); + return; + } + delay_nanos = std::min(max_delay_nanos, ConnectionUtils::GetPauseTime(pause_ns_, tries)); + } else { + delay_nanos = ConnectionUtils::GetPauseTime(pause_ns_, tries); + } + // TODO retry_timer_ reset + // retry_timer_->newTimer(delay_nanos, TimeUtil::GetNowNanos()); + GroupAndSend(actions, tries + 1); + } + + private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::shared_ptr>> actions_ = + std::make_shared>>(); + int64_t pause_ns_; + int32_t max_attempts_; + int64_t operation_timeout_nanos_; + int64_t rpc_timeout_nanos_; + int32_t start_log_errors_count_; + + std::shared_ptr controller_ = std::shared_ptr(); + int64_t start_ns_ = TimeUtil::GetNowNanos(); + int32_t tries_ = 1; + std::shared_ptr> exceptions_ = + std::make_shared>(); + BatchCallable callable_; + std::map> action2errors_; + std::vector>> promises_; + std::vector>> futures_; + std::shared_ptr> promise_ = std::make_shared>(); +}; +} +/* namespace hbase */ + +// LOG(INFO) << "[" << operation_timeout_nanos_ << " - (" << GetNowNanos() << " - " << start_ns_ +// << ")]=" << (operation_timeout_nanos_ - (GetNowNanos() - start_ns_)); diff --git a/hbase-native-client/core/hregion-info.h b/hbase-native-client/core/hregion-info.h new file mode 100644 index 0000000..8aabeb6 --- /dev/null +++ b/hbase-native-client/core/hregion-info.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include + +namespace hbase { +class HRegionInfo { + public: + HRegionInfo(const long& region_id, const hbase::pb::TableName& table_name, const int& replica_id) + : region_id_(region_id), table_name_(table_name), replica_id_(replica_id) {} + + std::string GetPrintName() const { + return folly::sformat("{0}-{1}.{2}-{3}", table_name_.namespace_(), table_name_.qualifier(), + region_id_, replica_id_); + } + + const std::string& RegionName() const { return encoded_name; } + long RegionId() const { return region_id_; } + int ReplicaId() const { return replica_id_; } + const hbase::pb::TableName& TableName() const { return table_name_; } + + private: + std::string encoded_name; + long region_id_; + hbase::pb::TableName table_name_; + int replica_id_; +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/hregion-location.h b/hbase-native-client/core/hregion-location.h new file mode 100644 index 0000000..69792a6 --- /dev/null +++ b/hbase-native-client/core/hregion-location.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "hregion-info.h" +#include "if/HBase.pb.h" +#include + +namespace hbase { + +enum RegionLocateType { + kBefore, + kCurrent, + kAfter +}; + +class HRegionLocation { + public: + HRegionLocation(const long& seq_num, const hbase::pb::ServerName& server_name, + std::shared_ptr region_info) + : seq_num_(seq_num), server_name_(server_name), region_info_(region_info) {} + virtual ~HRegionLocation(); + + long set_num() { return seq_num_; } + + hbase::pb::ServerName server_name() const { return server_name_; } + + std::shared_ptr region_info() const { return region_info_; } + + std::size_t HashCode() { + std::size_t hash = 0; + boost::hash_combine(hash, seq_num_); + boost::hash_combine(hash, server_name_.host_name()); + boost::hash_combine(hash, server_name_.port()); + boost::hash_combine(hash, server_name_.start_code()); + boost::hash_combine(hash, region_info_->RegionId()); + boost::hash_combine(hash, region_info_->RegionName()); + boost::hash_combine(hash, region_info_->ReplicaId()); + boost::hash_combine(hash, region_info_->TableName().namespace_()); + boost::hash_combine(hash, region_info_->TableName().qualifier()); + return hash; + } + + private: + long seq_num_; + hbase::pb::ServerName server_name_; + std::shared_ptr region_info_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region_request.h b/hbase-native-client/core/region_request.h new file mode 100644 index 0000000..c72cf94 --- /dev/null +++ b/hbase-native-client/core/region_request.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include +#include +#include +#include "core/hregion-location.h" +#include "core/action.h" + +using hbase::Action; +using hbase::HRegionLocation; +namespace hbase { + +class RegionRequest { + public: + RegionRequest(const std::shared_ptr &hregion_loc) + : hregion_loc_(hregion_loc) {} + ~RegionRequest() {} + void AddAction(std::shared_ptr action) { actions_->push_back(action); } + std::shared_ptr HRegionLocation() { return hregion_loc_; } + std::shared_ptr>> Actions() { return actions_; } + + private: + std::shared_ptr hregion_loc_; + std::shared_ptr>> actions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/server_request.h b/hbase-native-client/core/server_request.h new file mode 100644 index 0000000..158be23 --- /dev/null +++ b/hbase-native-client/core/server_request.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/action.h" +#include "core/hregion-location.h" +#include "core/region_request.h" + +using hbase::Action; +using hbase::HRegionLocation; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + // this should be hash to Region request + using ActionsByRegion = std::unordered_map>; + + ServerRequest() {} + + ~ServerRequest() {} + + void AddAction(std::shared_ptr hregion_location, + std::shared_ptr action) { + actions_by_region_[hregion_location->HashCode()]->AddAction(action); + } + + ActionsByRegion &Actions() { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ -- 1.8.3.1