From ee55387b4d8f7dbc4cbf4fbc33900ed36a95165f Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Tue, 14 Feb 2017 22:53:25 +1100 Subject: [PATCH] V3 patch which for Multi RPC's diff --git a/hbase-native-client/core/abstract_response.h b/hbase-native-client/core/abstract_response.h new file mode 100644 index 0000000..3f7b3cc --- /dev/null +++ b/hbase-native-client/core/abstract_response.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +namespace hbase { + +class AbstractResponse { + public: + enum class ResponseType { + SINGLE = 0, + MULTI = 1, + }; + + virtual ResponseType Type() const = 0; + virtual ~AbstractResponse() {} +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h new file mode 100644 index 0000000..dbeea8c --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-batch-caller-factory.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/async-rpc-retrying-batch-caller.h" +#include +#include + +#include "core/row.h" +#include "core/async-connection.h" +#include "core/get.h" +#include "core/hregion-location.h" +#include "if/HBase.pb.h" + +using hbase::Get; +using hbase::pb::TableName; + +namespace hbase { + +template +class BatchCallerBuilder; + +template +class AsyncRpcRetryingBatchCallerFactory { + public: + template + friend class BatchCallerBuilder; + AsyncRpcRetryingBatchCallerFactory(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~AsyncRpcRetryingBatchCallerFactory() = default; + + template + std::shared_ptr> Batch() { + return std::make_shared>(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + + // BatchCallerBuilder + public: + template + class BatchCallerBuilder + : public std::enable_shared_from_this> { + public: + BatchCallerBuilder(std::shared_ptr> async_rpc_batch) + : async_rpc_batch_(async_rpc_batch), + table_name_(nullptr), + actions_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0) { + LOG(INFO) << "BatchCallerBuilder Starts and Ends"; + } + + virtual ~BatchCallerBuilder() = default; + + typedef BatchCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + GenenericThisType& SetTable(std::shared_ptr table_name) { + LOG(INFO) << "Table"; + table_name_ = table_name; + return *this; + } + + GenenericThisType& SetActions(std::shared_ptr>> actions) { + LOG(INFO) << "Actions"; + actions_ = actions; + return *this; // shared_this(); + } + + GenenericThisType& SetOperationTimeout(long operation_timeout_nanos) { + LOG(INFO) << "Op TimeOut"; + operation_timeout_nanos_ = operation_timeout_nanos; + return *this; // shared_this(); + } + + GenenericThisType& SetRpcTimeout(long rpc_timeout_nanos) { + LOG(INFO) << "Rpc Timeout"; + rpc_timeout_nanos_ = rpc_timeout_nanos; + return *this; // shared_this(); + } + + GenenericThisType& SetPause(int64_t pause_ns) { + LOG(INFO) << "Pause"; + pause_ns_ = pause_ns; + return *this; // shared_this(); + } + + GenenericThisType& SetMaxAttempts(int32_t max_attempts) { + LOG(INFO) << "MaxAttempts"; + max_attempts_ = max_attempts; + return *this; // shared_this(); + } + + GenenericThisType& StartLogErrorsCnt(int32_t start_log_errors_cnt) { + LOG(INFO) << "LogErrorsCnt"; + start_log_errors_cnt_ = start_log_errors_cnt; + return *this; // shared_this(); + } + + std::shared_ptr> Build() { + LOG(INFO) << "Is this BatchCallerBuilder Build() ??"; + return std::make_shared>( + async_rpc_batch_->retry_timer_, async_rpc_batch_->conn_, table_name_, actions_, 1000, 1, + operation_timeout_nanos_, rpc_timeout_nanos_, 1); + } + + std::shared_ptr> Call() { + AsyncRpcRetryingBatchCaller async_rpc_retrying_batch_caller( + async_rpc_batch_->retry_timer_, async_rpc_batch_->conn_, table_name_, actions_, pause_ns_, + max_attempts_, operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_cnt_); + return async_rpc_retrying_batch_caller.Call(); + } + + public: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr> async_rpc_batch_; + std::shared_ptr table_name_; + std::shared_ptr location_cache_; + std::shared_ptr>> actions_; + int64_t operation_timeout_nanos_; + int64_t rpc_timeout_nanos_; + int64_t pause_ns_ = 0; + int32_t max_attempts_ = 0; + int32_t start_log_errors_cnt_ = 0; + // Callable callable_; + }; +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-batch-caller.h b/hbase-native-client/core/async-rpc-retrying-batch-caller.h new file mode 100644 index 0000000..26d17f0 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-batch-caller.h @@ -0,0 +1,512 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "core/action.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/get.h" +#include "core/hbase-rpc-controller.h" +#include "core/hregion-location.h" +#include "core/location-cache.h" +#include "core/multi_response.h" +#include "core/region_request.h" +#include "core/region_result.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/result.h" +#include "core/row.h" +#include "core/server_request.h" +#include "connection/rpc-client.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +#include + +using folly::Promise; +using folly::Future; +using hbase::Action; +using hbase::Get; +using hbase::LocationCache; +using hbase::MultiAction; +using hbase::RequestConverter; +using hbase::Result; +using hbase::RpcClient; +using hbase::Get; +using hbase::ServerRequest; +using hbase::pb::ServerName; +using hbase::pb::TableName; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +using ActionsByServer = std::map, std::shared_ptr>; +using ActionsByRegion = ServerRequest::ActionsByRegion; + +template +using BatchCallable = + std::function(std::shared_ptr, + std::shared_ptr, std::shared_ptr)>; +template +class AsyncRpcRetryingBatchCaller { + public: + AsyncRpcRetryingBatchCaller(std::shared_ptr retry_timer, + std::shared_ptr conn, std::shared_ptr table_name, + std::shared_ptr>> actions, + int64_t pause_ns, int32_t max_attempts, + int64_t operation_timeout_nanos, int64_t rpc_timeout_nanos, + int32_t start_log_errors_count) + : retry_timer_(retry_timer), + conn_(conn), + table_name_(table_name), + pause_ns_(pause_ns), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count) { + max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); + uint32_t index = 0; + for (auto row : *actions) { + std::shared_ptr action = std::make_shared(row, index); + actions_->push_back(action); + index += 1; + } + promises_.reserve(actions_->size()); + futures_.reserve(actions_->size()); + } + + virtual ~AsyncRpcRetryingBatchCaller() = default; + + std::shared_ptr> Call() { + GroupAndSend(*actions_.get(), 1); + auto results = std::make_shared>(); + return results; + } + + private: + long ElapsedMs() { + std::chrono::duration_cast(nanoseconds(GetNowNanos() - start_ns_)).count(); + } + + void CompleteExceptionally() { + // TODO Commented for now + // this->promise_->setException(RetriesExhaustedException(max_attempts_ - 1, exceptions_)); + } + + int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + int64_t RemainingTimeNs() { return operation_timeout_nanos_ - (GetNowNanos() - start_ns_); } + + std::shared_ptr> RemoveErrors(Action &action) { + // TODO Remove the action + return nullptr; + } + + // TODO Java has Supplier for RegionRequest + void LogException(int32_t tries, std::vector> ®ion_requests, + std::shared_ptr &error, + std::shared_ptr server_name) { + if (tries > start_log_errors_count_) { + std::string regions; + std::for_each( + std::begin(region_requests), std::end(region_requests), + [®ions]( + const std::vector>::value_type ®ion_request) { + regions += region_request->HRegionLocation()->region_info()->GetPrintName() + ", "; + }); + LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" + << table_name_->qualifier() << " from " << server_name->host_name() + << " failed, tries=" << tries << error->what(); + } + } + + std::string GetExtraContextForError(std::shared_ptr server_name) { + return server_name->host_name(); + } + + void AddError(const std::shared_ptr &action, std::shared_ptr error, + std::shared_ptr server_name) { + std::vector action2errors; + ThrowableWithExtraContext extra_error(error, GetNowNanos(), ""); + action2errors.push_back(extra_error); + } + + void AddError(const std::vector> &actions, + std::shared_ptr error, std::shared_ptr server_name) { + for (const auto action : actions) { + AddError(action, error, server_name); + } + } + + void FailOne(const std::shared_ptr &action, int32_t tries, + std::shared_ptr error, int64_t current_time, std::string &extras) { + // TODO auto itr = action2errors.find(123); action2errors.erase(itr); + std::vector> errors; + errors.push_back(std::make_shared(error, current_time, extras)); + // TODO + // Call CompleteExceptionally + } + + void FailAll(const std::vector> &actions, int32_t tries, + std::shared_ptr error, std::shared_ptr server_name) { + int64_t current_time = 0L; + std::string extras = GetExtraContextForError(server_name); + for (const auto action : actions) { + FailOne(action, tries, error, time(0), extras); + } + } + + void FailAll(const std::vector> &actions, int32_t tries) { + for (const auto action : actions) { + // TODO + } + } + + void OnError(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr exc, std::shared_ptr server_name) { + std::vector> copied_actions; + std::vector> region_requests; + region_requests.reserve(actions_by_region.size()); + std::for_each( + std::begin(actions_by_region), std::end(actions_by_region), + [®ion_requests, &copied_actions](const ActionsByRegion::value_type &action_by_region) { + region_requests.push_back(action_by_region.second); + for (const auto &action : *action_by_region.second->Actions().get()) { + copied_actions.push_back(action); + } + }); + // TODO Java API passes Throwable error to logException + // Throwable error = translateException(t); + LogException(tries, region_requests, exc, server_name); + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + FailAll(copied_actions, tries, exc, server_name); + } + TryResubmit(copied_actions, tries); + } + + void TryResubmit(std::vector> actions, int32_t tries) { + long delay_nanos; + if (operation_timeout_nanos_ > 0) { + long max_delay_nanos = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_nanos <= 0) { + FailAll(actions, tries); + return; + } + delay_nanos = std::min(max_delay_nanos, ConnectionUtils::GetPauseTime(pause_ns_, tries)); + } else { + delay_nanos = ConnectionUtils::GetPauseTime(pause_ns_, tries); + } + // TODO retry_timer_ reset + // retry_timer_->newTimer(delay_nanos, TimeUtil::GetNowNanos()); + GroupAndSend(actions, tries + 1); + } + + void GroupAndSend(std::vector> &actions, int32_t tries) { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + FailAll(*actions_.get(), tries); + return; + } + } else { + locate_timeout_ns = -1L; + } + + ActionsByServer actions_by_server; + std::vector> locate_failed; + + for (const auto action : actions) { + try { + /* + * loc should be coming from conn_->get_locator()->GetRegionLocation(*table_name_, row, + * hbase::RegionLocateType::kCurrent, locate_timeout_ns); + * Ref HBASE-17465:- core/async-rpc-retrying-caller.h + */ + // TODO loc = conn_->get_locator()....; + std::shared_ptr loc; + bool found = false; + for (auto itr = actions_by_server.begin(); itr != actions_by_server.end(); ++itr) { + if (loc->server_name().host_name() == itr->first->host_name()) { + found = true; + itr->second->AddAction(loc, action); + break; + } + } + + if (!found) { + // Create new key + actions_by_server[std::make_shared(loc->server_name())] + ->AddAction(loc, action); + } + } + catch (const std::exception &exc) { + auto pexc = std::make_shared(exc); + if (auto error = std::dynamic_pointer_cast(pexc)) { + int64_t current_time = 0; + std::string extra = ""; + FailOne(action, tries, nullptr, time(0), extra); + return; + } + AddError(action, pexc, nullptr); + locate_failed.push_back(action); + } + } + if (!actions_by_server.empty()) { + Send(actions_by_server, tries); + } + + if (!locate_failed.empty()) { + TryResubmit(locate_failed, tries); + } + return; + } + + void Send(const ActionsByServer &actions_by_server, int32_t tries) { + int64_t remaining_ns; + if (operation_timeout_nanos_ > 0) { + remaining_ns = RemainingTimeNs(); + if (remaining_ns <= 0) { + // TODO + std::vector> failed_actions; + std::for_each(std::begin(actions_by_server), std::end(actions_by_server), + [&failed_actions](const ActionsByServer::value_type &action_by_server) { + for (const auto &value : action_by_server.second->Actions()) { + for (const auto &failed_action : *value.second->Actions().get()) { + failed_actions.push_back(failed_action); + } + } + }); + FailAll(failed_actions, tries); + return; + } + } else { + remaining_ns = std::numeric_limits::max(); + } + + for (auto action : actions_by_server) { + for (auto itr = action.second->Actions().begin(); itr != action.second->Actions().end(); + ++itr) { + auto loc = itr->second->HRegionLocation(); + auto region_name = itr->second->HRegionLocation()->region_info()->RegionName(); + auto actions_by_region = itr->second->Actions(); + + /* + * This should come from try { conn_->GetRegionServerStub(loc->server_name()); ... } + * catch(...){...} + * Ref HBASE-17465:- core/async-rpc-retrying-caller.h + */ + std::shared_ptr stub; + auto req = RequestConverter::ToMultiRequest(region_name, *actions_by_region.get()); + ConnectionUtils::ResetController(controller_, remaining_ns); + auto shared_req = std::shared_ptr(req.release()); +/* + * TODO wip + * Ref:- core/async-rpc-retrying-caller.h + */ +#if 0 + auto results = std::vector>{}; + auto col = uint64_t{0}; + for (; col < num_puts; col++) { + results.push_back( + folly::makeFuture(col) + .via(cpu_pool.get()) + .then([loc](uint64_t col) { return MakeRequest(col, loc->region_name()); }) + .then([connection](std::unique_ptr req) { + return (*connection)(std::move(req)); + })); + } + auto allf = folly::collect(results).get(); + for (const auto f : allf) { + LOG(INFO) << "Resp call id is:- " << f.call_id(); + LOG(INFO) << "IOBuf is:- " << f.cell_block_meta()->data(); + } +#endif + callable_(controller_, loc, stub) + .then([this, action, tries, shared_req](const RESP &resp) { + auto promise = std::make_shared>(); + promise->setValue(resp); + // promises_.push_back(promise); + auto f = promise->getFuture(); + auto hb_resp = f.get(); + // hbase::Response hb_resp; + try { + // This will return pb_resp to hbase::MultiResponse + auto multi_results = ResponseConverter::GetResults(shared_req, hb_resp); + OnComplete(action.second->Actions(), tries, action.first, multi_results); + } + catch (const std::exception &exc) { + OnError(action.second->Actions(), tries, std::make_shared(exc), + action.first); + return; + } + }) + .onError([&, this](const std::exception &exc) { + // TODO check controller.getFailed() else call below OnError + OnError(action.second->Actions(), tries, std::make_shared(exc), + action.first); + }); + } + } + return; + } + + void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, + std::shared_ptr server_name, + const std::shared_ptr &multi_results) { + std::vector> failed_actions; + for (const auto &action_by_region : actions_by_region) { + auto region_name = action_by_region.second->HRegionLocation()->region_info()->RegionName(); + auto region_result = multi_results->Results().find(region_name); + // If not found create + if (region_result != multi_results->Results().end()) { + for (const auto &action : *action_by_region.second->Actions().get()) { + OnComplete(action, action_by_region.second, tries, server_name, region_result->second, + failed_actions); + } + } else { + // get region exception + std::shared_ptr exc; + if (exc == nullptr) { + LOG(ERROR) << "Server sent us neither results nor exceptions for "; //+ + // Bytes.toStringBinary(rn)); + // error = new RuntimeException("Invalid response"); + } else { + // TODO // Translate exception + // error = translateException(t); + // TODO + // LogExceptions + // TODO Update cached locations + // conn.getLocator().updateCachedLocation + for (const auto &action : *action_by_region.second->Actions().get()) { + failed_actions.push_back(action); + } + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + FailAll(failed_actions, tries, exc, server_name); + return; + } + } + AddError(*action_by_region.second->Actions().get(), exc, server_name); + } + } + if (!failed_actions.empty()) { + TryResubmit(failed_actions, tries); + } + } + + void OnComplete(const std::shared_ptr &action, + const std::shared_ptr ®ion_request, int32_t tries, + std::shared_ptr server_name, + const std::shared_ptr ®ion_result, + std::vector> &failed_actions) { + + auto result_or_exc = region_result->Result(action->OriginalIndex()); + bool no_result_or_exc_from_server = false; + std::string err_msg; + if (result_or_exc == nullptr) { + no_result_or_exc_from_server = true; + err_msg = "Invalid response"; + } else { + auto result = std::get<0>(*result_or_exc.get()); + auto exc = std::get<1>(*result_or_exc.get()); + if (exc != nullptr) { + // TODO translateException + // Throwable error = translateException((Throwable) result); + // TODO LogException + // LogException(tries, region_request, error, server_name); + if (auto error = + std::dynamic_pointer_cast(exc) || tries >= max_attempts_) { + auto extra_error = GetExtraContextForError(server_name); + FailOne(action, tries, exc, time(0), extra_error); + + } else { + failed_actions.push_back(action); + } + } else if (result != nullptr) { + // TODO + // action2Future.get(action).complete((T) result); + } else { + no_result_or_exc_from_server = true; + err_msg = "Unable to set either result or exception for response received from server."; + } + } + + if (no_result_or_exc_from_server) { + std::string row = ""; + if (const auto &pget = std::dynamic_pointer_cast(action->GetAction())) { + row = pget->Row(); + } + LOG(ERROR) << "Server " << server_name->ShortDebugString() + << " sent us neither result nor exception for row '" << row << "' of " + << region_request->HRegionLocation()->region_info()->RegionName(); + AddError(action, std::make_shared("Invalid response"), server_name); + failed_actions.push_back(action); + } + return; + } + + private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::shared_ptr>> actions_ = + std::make_shared>>(); + int64_t pause_ns_; + int32_t max_attempts_; + int64_t operation_timeout_nanos_; + int64_t rpc_timeout_nanos_; + int32_t start_log_errors_count_; + + std::shared_ptr controller_ = std::shared_ptr(); + int64_t start_ns_ = TimeUtil::GetNowNanos(); + int32_t tries_ = 1; + std::shared_ptr> exceptions_ = + std::make_shared>(); + BatchCallable callable_; + std::map> action2errors_; + std::vector>> promises_; + std::vector> futures_; +}; +} +/* namespace hbase */ + +// LOG(INFO) << "[" << operation_timeout_nanos_ << " - (" << GetNowNanos() << " - " << start_ns_ +// << ")]=" << (operation_timeout_nanos_ - (GetNowNanos() - start_ns_)); diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h index f79c633..3358e00 100644 --- a/hbase-native-client/core/get.h +++ b/hbase-native-client/core/get.h @@ -24,6 +24,7 @@ #include #include #include +#include "core/row.h" #include "core/time_range.h" #include "if/Client.pb.h" @@ -33,16 +34,16 @@ namespace hbase { * @brief Map consisting of column families and qualifiers to be used for Get * operation */ -using FamilyMap = std::map>; +using FamilyMap = std::map >; -class Get { +class Get : public Row { public: /** * Constructors */ - explicit Get(const std::string& row); - Get(const Get& cget); - Get& operator=(const Get& cget); + explicit Get(const std::string &row); + Get(const Get &cget); + Get &operator=(const Get &cget); ~Get(); @@ -56,7 +57,7 @@ class Get { * is 1. * @param max_versions max_versons to set */ - Get& SetMaxVersions(int32_t max_versions = 1); + Get &SetMaxVersions(int32_t max_versions = 1); /** * @brief Returns whether blocks should be cached for this Get operation. @@ -67,18 +68,18 @@ class Get { * @brief Set whether blocks should be cached for this Get operation. * @param cache_blocks to set */ - Get& SetCacheBlocks(bool cache_blocks); + Get &SetCacheBlocks(bool cache_blocks); /** * @brief Returns the Get family map (FamilyMap) for this Get operation. Used * for constructing Scan object with an already constructed Get */ - const FamilyMap& Family() const; + const FamilyMap &Family() const; /** * @brief Returns the timerange for this Get */ - const TimeRange& Timerange() const; + const TimeRange &Timerange() const; /** * @brief Get versions of columns only within the specified timestamp range, @@ -86,19 +87,19 @@ class Get { * @param minStamp the minimum timestamp, inclusive * @param maxStamp the maximum timestamp, exclusive */ - Get& SetTimeRange(int64_t min_timestamp, int64_t max_timestamp); + Get &SetTimeRange(int64_t min_timestamp, int64_t max_timestamp); /** * @brief Get versions of columns with the specified timestamp. * @param The timestamp to be set */ - Get& SetTimeStamp(int64_t timestamp); + Get &SetTimeStamp(int64_t timestamp); /** * @brief Get all columns from the specified family. * @param family to be retrieved */ - Get& AddFamily(const std::string& family); + Get &AddFamily(const std::string &family); /** * @brief Get the column from the specific family with the specified @@ -106,12 +107,13 @@ class Get { * @param family to be retrieved * @param qualifier to be retrieved */ - Get& AddColumn(const std::string& family, const std::string& qualifier); + Get &AddColumn(const std::string &family, const std::string &qualifier); /** * @brief Returns the row for this Get operation */ - const std::string& Row() const; + const std::string &Row() const; + const std::string &row() const { return this->Row(); } /** * @brief Returns true if family map (FamilyMap) is non empty false otherwise @@ -127,7 +129,7 @@ class Get { * @brief Sets the consistency level for this Get operation * @param Consistency to be set */ - Get& SetConsistency(hbase::pb::Consistency consistency); + Get &SetConsistency(hbase::pb::Consistency consistency); private: std::string row_ = ""; @@ -144,7 +146,7 @@ class Get { * @throws std::runtime_error if row is empty or greater than * MAX_ROW_LENGTH(i.e. std::numeric_limits::max()) */ - void CheckRow(const std::string& row); + void CheckRow(const std::string &row); }; } // namespace hbase diff --git a/hbase-native-client/core/hregion-info.h b/hbase-native-client/core/hregion-info.h new file mode 100644 index 0000000..8aabeb6 --- /dev/null +++ b/hbase-native-client/core/hregion-info.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include + +namespace hbase { +class HRegionInfo { + public: + HRegionInfo(const long& region_id, const hbase::pb::TableName& table_name, const int& replica_id) + : region_id_(region_id), table_name_(table_name), replica_id_(replica_id) {} + + std::string GetPrintName() const { + return folly::sformat("{0}-{1}.{2}-{3}", table_name_.namespace_(), table_name_.qualifier(), + region_id_, replica_id_); + } + + const std::string& RegionName() const { return encoded_name; } + long RegionId() const { return region_id_; } + int ReplicaId() const { return replica_id_; } + const hbase::pb::TableName& TableName() const { return table_name_; } + + private: + std::string encoded_name; + long region_id_; + hbase::pb::TableName table_name_; + int replica_id_; +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/hregion-location.h b/hbase-native-client/core/hregion-location.h new file mode 100644 index 0000000..69792a6 --- /dev/null +++ b/hbase-native-client/core/hregion-location.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "hregion-info.h" +#include "if/HBase.pb.h" +#include + +namespace hbase { + +enum RegionLocateType { + kBefore, + kCurrent, + kAfter +}; + +class HRegionLocation { + public: + HRegionLocation(const long& seq_num, const hbase::pb::ServerName& server_name, + std::shared_ptr region_info) + : seq_num_(seq_num), server_name_(server_name), region_info_(region_info) {} + virtual ~HRegionLocation(); + + long set_num() { return seq_num_; } + + hbase::pb::ServerName server_name() const { return server_name_; } + + std::shared_ptr region_info() const { return region_info_; } + + std::size_t HashCode() { + std::size_t hash = 0; + boost::hash_combine(hash, seq_num_); + boost::hash_combine(hash, server_name_.host_name()); + boost::hash_combine(hash, server_name_.port()); + boost::hash_combine(hash, server_name_.start_code()); + boost::hash_combine(hash, region_info_->RegionId()); + boost::hash_combine(hash, region_info_->RegionName()); + boost::hash_combine(hash, region_info_->ReplicaId()); + boost::hash_combine(hash, region_info_->TableName().namespace_()); + boost::hash_combine(hash, region_info_->TableName().qualifier()); + return hash; + } + + private: + long seq_num_; + hbase::pb::ServerName server_name_; + std::shared_ptr region_info_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi_response.cc b/hbase-native-client/core/multi_response.cc new file mode 100644 index 0000000..b96e365 --- /dev/null +++ b/hbase-native-client/core/multi_response.cc @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/multi_response.h" + +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->Size(); + } + return size; +} + +void MultiResponse::Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, + std::shared_ptr exc) { + Result(region_name)->AddResult(original_index, result, exc); +} + +void MultiResponse::AddException(const std::string& region_name, + std::shared_ptr exception) { + exceptions_[region_name] = exception; +} + +const std::shared_ptr& MultiResponse::Exception(const std::string& region_name) + const { + try { + auto find = exceptions_.at(region_name); + return find; + } + catch (const std::out_of_range& oor) { + return nullptr; + } +} + +const std::map >& MultiResponse::Exceptions() const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + std::shared_ptr stat) { + Result(region_name)->SetStat(stat); +} + +const std::map >& MultiResponse::Results() const { + return results_; +} + +std::shared_ptr& MultiResponse::Result(const std::string& region_name) { + auto found = results_.find(region_name); + // If not found create + if (found != results_.end()) { + results_[region_name] = std::make_shared(); + } + return results_[region_name]; +} + +AbstractResponse::ResponseType MultiResponse::Type() const { return ResponseType::MULTI; } + +MultiResponse::~MultiResponse() { + // TODO Auto-generated destructor stub +} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/multi_response.h b/hbase-native-client/core/multi_response.h new file mode 100644 index 0000000..5e62a72 --- /dev/null +++ b/hbase-native-client/core/multi_response.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "core/abstract_response.h" +#include "core/region_result.h" +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::AbstractResponse; +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class MultiResponse : public AbstractResponse { + public: + MultiResponse(); + + /** + * @brief Returns Number of pairs in this container + */ + int Size() const; + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param originalIndex the original index of the Action (request). + * @param resOrEx the result or error; will be empty for successful Put and Delete actions. + */ + void Add(const std::string& region_name, int32_t original_index, + std::shared_ptr result, std::shared_ptr exc); + + void AddException(const std::string& region_name, std::shared_ptr exception); + + /** + * @return the exception for the region, if any. Null otherwise. + */ + const std::shared_ptr& Exception(const std::string& region_name) const; + + const std::map>& Exceptions() const; + + void AddStatistic(const std::string& region_name, std::shared_ptr stat); + + const std::map>& Results() const; + + AbstractResponse::ResponseType Type() const; + + virtual ~MultiResponse(); + + private: + std::shared_ptr& Result(const std::string& region_name); + + // map of regionName to map of Results by the original index for that Result + std::map> results_; + /** + * The server can send us a failure for the region itself, instead of individual failure. + * It's a part of the protobuf definition. + */ + std::map> exceptions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region_request.h b/hbase-native-client/core/region_request.h new file mode 100644 index 0000000..c72cf94 --- /dev/null +++ b/hbase-native-client/core/region_request.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include +#include +#include +#include "core/hregion-location.h" +#include "core/action.h" + +using hbase::Action; +using hbase::HRegionLocation; +namespace hbase { + +class RegionRequest { + public: + RegionRequest(const std::shared_ptr &hregion_loc) + : hregion_loc_(hregion_loc) {} + ~RegionRequest() {} + void AddAction(std::shared_ptr action) { actions_->push_back(action); } + std::shared_ptr HRegionLocation() { return hregion_loc_; } + std::shared_ptr>> Actions() { return actions_; } + + private: + std::shared_ptr hregion_loc_; + std::shared_ptr>> actions_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/region_result.h b/hbase-native-client/core/region_result.h new file mode 100644 index 0000000..cf18fd0 --- /dev/null +++ b/hbase-native-client/core/region_result.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class RegionResult { + using ResultOrException = + std::tuple, std::shared_ptr>; + + public: + RegionResult() {} + void AddResult(int32_t index, std::shared_ptr result, + std::shared_ptr exc) { + auto res_or_ex = std::make_tuple(result, exc); + auto ptr = std::make_shared(res_or_ex); + result_[index] = ptr; + } + + void SetStat(std::shared_ptr stat) { stat_ = stat; } + + int Size() { return result_.size(); } + + const std::shared_ptr Result(int32_t index) const { + auto found = result_.find(index); + // If not found return nullptr + if (found != result_.end()) { + return found->second; + } + return nullptr; + } + + const std::shared_ptr& Stat() { return stat_; } + + ~RegionResult() {} + + private: + std::map> result_; + std::shared_ptr stat_; +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/request_converter.cc b/hbase-native-client/core/request_converter.cc index eba07df..dc8dd54 100644 --- a/hbase-native-client/core/request_converter.cc +++ b/hbase-native-client/core/request_converter.cc @@ -20,9 +20,10 @@ #include "core/request_converter.h" #include #include "if/Client.pb.h" +#include -using hbase::Request; using hbase::pb::GetRequest; +using hbase::pb::MultiRequest; using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; using hbase::pb::ScanRequest; @@ -40,14 +41,8 @@ void RequestConverter::SetRegion(const std::string ®ion_name, region_specifier->set_value(region_name); } -std::unique_ptr RequestConverter::ToGetRequest(const Get &get, - const std::string ®ion_name) { - auto pb_req = Request::get(); - - auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_get = pb_msg->mutable_get(); +std::unique_ptr RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique(); pb_get->set_max_versions(get.MaxVersions()); pb_get->set_cache_blocks(get.CacheBlocks()); pb_get->set_consistency(get.Consistency()); @@ -67,7 +62,38 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, } } } + return pb_get; +} + +std::unique_ptr RequestConverter::ToGetRequest(const Get &get, + const std::string ®ion_name) { + auto pb_req = Request::get(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + auto pb_get = RequestConverter::ToGet(get); + pb_msg->set_allocated_get(pb_get.release()); + return pb_req; +} +std::unique_ptr RequestConverter::ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + auto region_specifier = new hbase::pb::RegionSpecifier(); + RequestConverter::SetRegion(region_name, region_specifier); + auto pb_region_action = pb_msg->add_regionaction(); + pb_region_action->set_allocated_region(region_specifier); + int action_num = 0; + for (const auto ®ion_action : region_actions) { + auto pb_action = pb_region_action->add_action(); + auto action = region_action->GetAction(); + if (auto pget = std::dynamic_pointer_cast(action)) { + auto pb_get = RequestConverter::ToGet(*pget); + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); + } + action_num += 1; + } return pb_req; } diff --git a/hbase-native-client/core/request_converter.h b/hbase-native-client/core/request_converter.h index 57f08cc..4b1f829 100644 --- a/hbase-native-client/core/request_converter.h +++ b/hbase-native-client/core/request_converter.h @@ -22,11 +22,20 @@ #include #include #include "connection/request.h" + #include "core/get.h" +#include "core/multi_action.h" #include "core/scan.h" +#include "core/region_request.h" #include "if/HBase.pb.h" +using hbase::Get; +using hbase::MultiAction; +using hbase::RegionRequest; +using hbase::Request; +using hbase::Scan; using hbase::pb::RegionSpecifier; +using hbase::pb::ServerName; namespace hbase { /** @@ -53,6 +62,9 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr ToMultiRequest( + const std::string ®ion_name, const std::vector> ®ion_actions); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +76,8 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); -}; -} /* namespace hbase */ + static std::unique_ptr ToGet(const Get &get); +}; +} +/* namespace hbase */ diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc index 19a3554..f4328f0 100644 --- a/hbase-native-client/core/response_converter.cc +++ b/hbase-native-client/core/response_converter.cc @@ -19,12 +19,24 @@ #include "core/response_converter.h" +#include +#include #include #include "core/cell.h" +#include "core/multi_response.h" +#include "if/Client.pb.h" +using hbase::Result; using hbase::pb::GetResponse; -using hbase::pb::ScanResponse; +using hbase::pb::MultiRegionLoadStats; +using hbase::pb::MultiRequest; +using hbase::pb::MultiResponse; +using hbase::pb::RegionAction; +using hbase::pb::RegionActionResult; +using hbase::pb::RegionLoadStats; +using hbase::pb::ResultOrException; +using hbase::pb::RegionSpecifier; namespace hbase { @@ -35,62 +47,127 @@ ResponseConverter::~ResponseConverter() {} std::unique_ptr ResponseConverter::FromGetResponse(const Response& resp) { auto get_resp = std::static_pointer_cast(resp.resp_msg()); - return ToResult(get_resp->result(), resp.cell_scanner()); -} - -std::unique_ptr ResponseConverter::ToResult( - const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { std::vector> vcells; - for (auto cell : result.cell()) { + bool exists = false; + bool stale = false; + bool partial = false; + + // Parse Results + if (get_resp) { + for (auto cell : get_resp->result().cell()) { std::shared_ptr pcell = std::make_shared(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), cell.value(), static_cast(cell.cell_type())); vcells.push_back(pcell); } - - // iterate over the cells coming from rpc codec - if (cell_scanner != nullptr) { - while (cell_scanner->Advance()) { - vcells.push_back(cell_scanner->Current()); - } - // TODO: check associated cell count? + exists = get_resp->result().exists(); + stale = get_resp->result().stale(); + partial = get_resp->result().partial(); } - return std::make_unique(vcells, result.exists(), result.stale(), result.partial()); + +// TODO Get Data from Cell Block + + return std::make_unique(vcells, exists, stale, partial); } -std::vector> ResponseConverter::FromScanResponse(const Response& resp) { - auto scan_resp = std::static_pointer_cast(resp.resp_msg()); - VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); - int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() - : scan_resp->results_size(); +std::shared_ptr ResponseConverter::GetResults( + const std::shared_ptr& request, const Response& resp) { - std::vector> results{static_cast(num_results)}; - for (int i = 0; i < num_results; i++) { - if (resp.cell_scanner() != nullptr) { - // Cells are out in cellblocks. Group them up again as Results. How many to read at a - // time will be found in getCellsLength -- length here is how many Cells in the i'th Result - int num_cells = scan_resp->cells_per_result(i); + auto pb_req = Request::multi(); + auto multi_req = std::static_pointer_cast(pb_req->req_msg()); + auto multi_resp = std::static_pointer_cast(resp.resp_msg()); - std::vector> vcells; - while (resp.cell_scanner()->Advance()) { - vcells.push_back(resp.cell_scanner()->Current()); + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + + auto multi_results = std::make_shared(); + for (int32_t num = 0; num < res_region_action_count; num++) { + RegionAction actions = multi_req->regionaction(num); + RegionActionResult action_result = multi_resp->regionactionresult(num); + RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + auto region_name = rs.value(); + + if (action_result.has_exception()) { + std::shared_ptr exc; + std::shared_ptr rexc; + ([action_result, exc, rexc]() { + hbase::pb::NameBytesPair nb_pair = action_result.exception(); + if (nb_pair.has_value()) { + // rexc = std::make_shared(nb_pair.name() + " " + nb_pair.value()); + // exc = std::move(rexc); } - // TODO: check associated cell count? - - if (vcells.size() != num_cells) { - std::string msg = "Results sent from server=" + std::to_string(num_results) + - ". But only got " + std::to_string(i) + - " results completely at client. Resetting the scanner to scan again."; - LOG(ERROR) << msg; - throw std::runtime_error(msg); + }); + multi_results->AddException(region_name, exc); + continue; } - // TODO: handle partial results per Result by checking partial_flag_per_result - results[i] = std::make_unique(vcells, false, scan_resp->stale(), false); + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); + } + + for (ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr result; + std::shared_ptr exc; + if (roe.has_exception()) { + std::shared_ptr rexc; + ([action_result, exc, rexc]() { + hbase::pb::NameBytesPair nb_pair = action_result.exception(); + if (nb_pair.has_value()) { + // rexc = std::make_shared(nb_pair.name() + " " + nb_pair.value()); + // exc = std::move(rexc); + } + }); + } else if (roe.has_result()) { + ([roe, &result]() { + bool exists = false; + bool stale = false; + bool partial = false; + // Parse Results + std::vector> vcells; + for (auto cell : roe.result().cell()) { + std::shared_ptr pcell = std::make_shared( + cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), cell.value(), + static_cast(cell.cell_type())); + vcells.push_back(pcell); + } + exists = roe.result().exists(); + stale = roe.result().stale(); + partial = roe.result().partial(); + result = std::make_shared(vcells, exists, stale, partial); + }); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; } else { - results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector> empty_cells; + result = std::make_shared(empty_cells, multi_resp->processed() ? true : false, + false, false); + } + multi_results->Add(region_name, roe.index(), result, exc); } } - return results; + if (multi_resp->has_regionstatistics()) { + MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_results->AddStatistic(stats.region(i).value(), + std::make_shared(stats.stat(i))); + } + } + return multi_results; } + } /* namespace hbase */ diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h index 859644b..39f6ee45 100644 --- a/hbase-native-client/core/response_converter.h +++ b/hbase-native-client/core/response_converter.h @@ -20,10 +20,15 @@ #pragma once #include +#include +#include "connection/request.h" #include "connection/response.h" +#include "core/multi_response.h" #include "core/result.h" #include "if/Client.pb.h" -#include "serde/cell-scanner.h" + +using hbase::Request; +using hbase::Response; namespace hbase { @@ -35,16 +40,14 @@ class ResponseConverter { public: ~ResponseConverter(); - static std::unique_ptr ToResult(const hbase::pb::Result& result, - const std::unique_ptr& cell_scanner); - /** * @brief Returns a Result object created by PB Message in passed Response object. * @param resp - Response object having the PB message. */ - static std::unique_ptr FromGetResponse(const Response& resp); + static std::unique_ptr FromGetResponse(const Response& resp); - static std::vector> FromScanResponse(const Response& resp); + static std::shared_ptr GetResults(const std::shared_ptr& request, + const Response& resp); private: // Constructor not required. We have all static methods to extract response from PB messages. diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h new file mode 100644 index 0000000..364acda --- /dev/null +++ b/hbase-native-client/core/row.h @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +namespace hbase { + +class Row { + public: + virtual ~Row() {} +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/server_request.h b/hbase-native-client/core/server_request.h new file mode 100644 index 0000000..158be23 --- /dev/null +++ b/hbase-native-client/core/server_request.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/action.h" +#include "core/hregion-location.h" +#include "core/region_request.h" + +using hbase::Action; +using hbase::HRegionLocation; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + // this should be hash to Region request + using ActionsByRegion = std::unordered_map>; + + ServerRequest() {} + + ~ServerRequest() {} + + void AddAction(std::shared_ptr hregion_location, + std::shared_ptr action) { + actions_by_region_[hregion_location->HashCode()]->AddAction(action); + } + + ActionsByRegion &Actions() { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ -- 1.8.3.1