diff --git hbase-native-client/connection/client-handler.cc hbase-native-client/connection/client-handler.cc index 113ebd0..cfe82ac 100644 --- hbase-native-client/connection/client-handler.cc +++ hbase-native-client/connection/client-handler.cc @@ -91,7 +91,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { if (cell_block_length > 0) { auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length); - received->set_cell_scanner(std::move(cell_scanner)); + received->set_cell_scanner(std::shared_ptr{cell_scanner.release()}); } received->set_resp_msg(resp_msg); diff --git hbase-native-client/connection/response.h hbase-native-client/connection/response.h index c5472b0..a3e89c8 100644 --- hbase-native-client/connection/response.h +++ hbase-native-client/connection/response.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "serde/cell-scanner.h" @@ -66,20 +67,26 @@ class Response { resp_msg_ = std::move(response); } - void set_cell_scanner(std::unique_ptr cell_scanner) { - cell_scanner_ = std::move(cell_scanner); - } + void set_cell_scanner(std::shared_ptr cell_scanner) { cell_scanner_ = cell_scanner; } - const std::unique_ptr& cell_scanner() const { return cell_scanner_; } + const std::shared_ptr cell_scanner() const { return cell_scanner_; } folly::exception_wrapper exception() { return exception_; } void set_exception(folly::exception_wrapper value) { exception_ = value; } + std::string DebugString() const { + std::string s{"call_id:"}; + s += folly::to(call_id_); + s += ", resp_msg:"; + s += resp_msg_->ShortDebugString(); + return s; + } + private: uint32_t call_id_; std::shared_ptr resp_msg_; - std::unique_ptr cell_scanner_; + std::shared_ptr cell_scanner_; folly::exception_wrapper exception_; }; } // namespace hbase diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index e9fc716..39968ec 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -19,10 +19,12 @@ cxx_library( name="core", exported_headers=[ + "async-client-scanner.h", "async-connection.h", "async-region-locator.h", "async-rpc-retrying-caller-factory.h", "async-rpc-retrying-caller.h", + "async-table-result-scanner.h", "client.h", "cell.h", "hbase-macros.h", @@ -40,10 +42,14 @@ cxx_library( "put.h", "scan.h", "result.h", + "result-scanner.h", "request-converter.h", "response-converter.h", "table.h", + "async-scan-rpc-retrying-caller.h", "raw-async-table.h", + "raw-scan-result-consumer.h", + "scan-result-cache.h", "hbase-rpc-controller.h", "time-range.h", "zk-util.h", @@ -56,9 +62,12 @@ cxx_library( "async-batch-rpc-retrying-caller.h", ], srcs=[ + "async-client-scanner.cc", "async-connection.cc", "async-rpc-retrying-caller-factory.cc", "async-rpc-retrying-caller.cc", + "async-scan-rpc-retrying-caller.cc", + "async-table-result-scanner.cc", "cell.cc", "client.cc", "hbase-rpc-controller.cc", @@ -69,6 +78,7 @@ cxx_library( "mutation.cc", "put.cc", "scan.cc", + "scan-result-cache.cc", "raw-async-table.cc", "result.cc", "request-converter.cc", @@ -103,7 +113,7 @@ cxx_library( "configuration.cc", "hbase-configuration-loader.cc", ], - deps=["//third-party:folly"], + deps=["//utils:utils", "//third-party:folly"], compiler_flags=['-Weffc++', '-ggdb'], visibility=['PUBLIC',],) cxx_test( @@ -193,6 +203,21 @@ cxx_test( ], run_test_separately=True,) cxx_test( + name="scan-result-cache-test", + srcs=["scan-result-cache-test.cc",], + deps=[":core",], + run_test_separately=True,) +cxx_test( + name="scanner-test", + srcs=["scanner-test.cc",], + deps=[ + ":core", + "//if:if", + "//serde:serde", + "//test-util:test-util", + ], + run_test_separately=True,) +cxx_test( name="zk-util-test", srcs=["zk-util-test.cc",], deps=[":core",], diff --git hbase-native-client/core/async-client-scanner.cc hbase-native-client/core/async-client-scanner.cc new file mode 100644 index 0000000..07cd0f0 --- /dev/null +++ hbase-native-client/core/async-client-scanner.cc @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-client-scanner.h" + +#include +#include +#include +#include + +namespace hbase { + +AsyncClientScanner::AsyncClientScanner( + std::shared_ptr conn, std::shared_ptr scan, + std::shared_ptr table_name, std::shared_ptr consumer, + nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) + : conn_(conn), + scan_(scan), + table_name_(table_name), + consumer_(consumer), + pause_(pause), + max_retries_(max_retries), + scan_timeout_nanos_(scan_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count) { + results_cache_ = std::make_shared(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); +} + +void AsyncClientScanner::Start() { OpenScanner(); } + +folly::Future> AsyncClientScanner::CallOpenScanner( + std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc) { + open_scanner_tries_++; + + auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false); + + auto self(shared_from_this()); + VLOG(5) << "Calling RPC Client to open the scanner"; + return rpc_client + ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), + User::defaultUser(), "ClientService") + .then([self, loc, controller, rpc_client](const std::unique_ptr& presp) { + VLOG(5) << "Scan Response:" << presp->DebugString(); + return std::make_shared(rpc_client, presp, loc, controller); + }); +} + +void AsyncClientScanner::OpenScanner() { + auto self(shared_from_this()); + open_scanner_tries_ = 1; + + auto caller = conn_->caller_factory() + ->Single>() + ->table(table_name_) + ->row(scan_->StartRow()) + ->locate_type(GetLocateType(*scan_)) + ->rpc_timeout(rpc_timeout_nanos_) + ->operation_timeout(scan_timeout_nanos_) + ->pause(pause_) + ->max_retries(max_retries_) + ->start_log_errors_count(start_log_errors_count_) + ->action([&](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) + -> folly::Future> { + return CallOpenScanner(rpc_client, controller, loc); + }) + ->Build(); + + caller->Call() + .then([this, self](std::shared_ptr resp) { + VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id() << ", starting scan"; + StartScan(resp); + }) + .onError([this, self](const folly::exception_wrapper& e) { + VLOG(3) << "Scanner received error:" << e.what(); + consumer_->OnError(e); + }) + .then([caller, self](const auto r) { return r; }); +} + +void AsyncClientScanner::StartScan(std::shared_ptr resp) { + auto self(shared_from_this()); + auto caller = conn_->caller_factory() + ->Scan() + ->scanner_id(resp->scan_resp_->scanner_id()) + ->region_location(resp->region_location_) + ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl())) + ->scan(scan_) + ->rpc_client(resp->rpc_client_) + ->consumer(consumer_) + ->results_cache(results_cache_) + ->rpc_timeout(rpc_timeout_nanos_) + ->scan_timeout(scan_timeout_nanos_) + ->pause(pause_) + ->max_retries(max_retries_) + ->start_log_errors_count(start_log_errors_count_) + ->Build(); + + caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_) + .then([caller, self](const bool has_more) { + if (has_more) { + // open the next scanner on the next region. + self->OpenScanner(); + } else { + self->consumer_->OnComplete(); + } + }) + .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); }) + .then([caller, self](const auto r) { return r; }); +} + +RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) { + // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow(). + // When added, this method should be modified to return other RegionLocateTypes + // (see ConnectionUtils.java #getLocateType()) + // TODO: When reversed scans are implemented, return other RegionLocateTypes + return RegionLocateType::kCurrent; +} + +} // namespace hbase diff --git hbase-native-client/core/async-client-scanner.h hbase-native-client/core/async-client-scanner.h new file mode 100644 index 0000000..a43a970 --- /dev/null +++ hbase-native-client/core/async-client-scanner.h @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-connection.h" +#include "core/async-rpc-retrying-caller-factory.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/hbase-rpc-controller.h" +#include "core/raw-scan-result-consumer.h" +#include "core/region-location.h" +#include "core/request-converter.h" +#include "core/response-converter.h" +#include "core/result.h" +#include "core/scan.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "if/HBase.pb.h" +#include "scan-result-cache.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class OpenScannerResponse { + public: + OpenScannerResponse(std::shared_ptr rpc_client, + const std::unique_ptr& resp, + std::shared_ptr region_location, + std::shared_ptr controller) + : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) { + scan_resp_ = std::static_pointer_cast(resp->resp_msg()); + cell_scanner_ = resp->cell_scanner(); + } + std::shared_ptr rpc_client_; + std::shared_ptr scan_resp_; + std::shared_ptr region_location_; + std::shared_ptr controller_; + std::shared_ptr cell_scanner_; +}; + +class AsyncClientScanner : public std::enable_shared_from_this { + public: + template + static std::shared_ptr Create(T&&... all) { + return std::shared_ptr(new AsyncClientScanner(std::forward(all)...)); + } + + void Start(); + + private: + // methods + AsyncClientScanner(std::shared_ptr conn, std::shared_ptr scan, + std::shared_ptr table_name, + std::shared_ptr consumer, nanoseconds pause, + uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); + + folly::Future> CallOpenScanner( + std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc); + + void OpenScanner(); + + void StartScan(std::shared_ptr resp); + + RegionLocateType GetLocateType(const Scan& scan); + + private: + // data + std::shared_ptr conn_; + std::shared_ptr scan_; + std::shared_ptr table_name_; + std::shared_ptr results_cache_; + std::shared_ptr consumer_; + nanoseconds pause_; + uint32_t max_retries_; + nanoseconds scan_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + uint32_t max_attempts_; + uint32_t open_scanner_tries_ = 0; +}; +} // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h index f1ffdac..1024fc3 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -28,7 +28,13 @@ #include "connection/rpc-client.h" #include "core/async-batch-rpc-retrying-caller.h" #include "core/async-rpc-retrying-caller.h" +#include "core/async-scan-rpc-retrying-caller.h" +#include "core/raw-scan-result-consumer.h" +#include "core/region-location.h" #include "core/row.h" +#include "core/scan-result-cache.h" +#include "core/scan.h" + #include "if/Client.pb.h" #include "if/HBase.pb.h" @@ -198,6 +204,114 @@ class BatchCallerBuilder : public std::enable_shared_from_this { + public: + explicit ScanCallerBuilder(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), + retry_timer_(retry_timer), + rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), + pause_(conn->connection_conf()->pause()), + scan_timeout_nanos_(conn->connection_conf()->scan_timeout()), + max_retries_(conn->connection_conf()->max_retries()), + start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), + scanner_id_(-1) {} + + virtual ~ScanCallerBuilder() = default; + + typedef ScanCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr rpc_client(std::shared_ptr rpc_client) { + rpc_client_ = rpc_client; + return shared_this(); + } + + SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) { + scan_timeout_nanos_ = scan_timeout_nanos; + return shared_this(); + } + + SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) { + scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos; + return shared_this(); + } + + SharedThisPtr pause(nanoseconds pause) { + pause_ = pause; + return shared_this(); + } + + SharedThisPtr max_retries(uint32_t max_retries) { + max_retries_ = max_retries; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + + SharedThisPtr region_location(std::shared_ptr region_location) { + region_location_ = region_location; + return shared_this(); + } + + SharedThisPtr scanner_id(int64_t scanner_id) { + scanner_id_ = scanner_id; + return shared_this(); + } + + SharedThisPtr scan(std::shared_ptr scan) { + scan_ = scan; + return shared_this(); + } + + SharedThisPtr results_cache(std::shared_ptr results_cache) { + results_cache_ = results_cache; + return shared_this(); + } + + SharedThisPtr consumer(std::shared_ptr consumer) { + consumer_ = consumer; + return shared_this(); + } + + std::shared_ptr Build() { + return std::make_shared( + conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_, + region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_, + rpc_timeout_nanos_, start_log_errors_count_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + std::shared_ptr rpc_client_; + std::shared_ptr scan_; + nanoseconds rpc_timeout_nanos_; + nanoseconds scan_timeout_nanos_; + nanoseconds scanner_lease_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; + std::shared_ptr region_location_; + int64_t scanner_id_; + std::shared_ptr consumer_; + std::shared_ptr results_cache_; +}; // end of ScanCallerBuilder + class AsyncRpcRetryingCallerFactory { private: std::shared_ptr conn_; @@ -218,6 +332,10 @@ class AsyncRpcRetryingCallerFactory { std::shared_ptr Batch() { return std::make_shared(conn_, retry_timer_); } + + std::shared_ptr Scan() { + return std::make_shared(conn_, retry_timer_); + } }; } // namespace hbase diff --git hbase-native-client/core/async-rpc-retrying-caller.cc hbase-native-client/core/async-rpc-retrying-caller.cc index f8b237b..d4ecdb6 100644 --- hbase-native-client/core/async-rpc-retrying-caller.cc +++ hbase-native-client/core/async-rpc-retrying-caller.cc @@ -220,6 +220,8 @@ void AsyncSingleRequestRpcRetryingCaller::ResetController( // explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the // templetized // class definitions. +class OpenScannerResponse; template class AsyncSingleRequestRpcRetryingCaller>; template class AsyncSingleRequestRpcRetryingCaller; +template class AsyncSingleRequestRpcRetryingCaller>; } /* namespace hbase */ diff --git hbase-native-client/core/async-scan-rpc-retrying-caller.cc hbase-native-client/core/async-scan-rpc-retrying-caller.cc new file mode 100644 index 0000000..d81c974 --- /dev/null +++ hbase-native-client/core/async-scan-rpc-retrying-caller.cc @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-scan-rpc-retrying-caller.h" + +namespace hbase { + +ScanResumerImpl::ScanResumerImpl(std::shared_ptr caller) + : caller_(caller), mutex_() {} + +void ScanResumerImpl::Resume() { + // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we + // just return at the first if condition without loading the resp and numValidResuls field. If + // resume is called after suspend, then it is also safe to just reference resp and + // numValidResults after the synchronized block as no one will change it anymore. + std::shared_ptr local_resp; + int64_t local_num_complete_rows; + + { + std::unique_lock mlock{mutex_}; + if (state_ == ScanResumerState::kInitialized) { + // user calls this method before we call prepare, so just set the state to + // RESUMED, the implementation will just go on. + state_ = ScanResumerState::kResumed; + return; + } + if (state_ == ScanResumerState::kResumed) { + // already resumed, give up. + return; + } + state_ = ScanResumerState::kResumed; + local_resp = resp_; + local_num_complete_rows = num_complete_rows_; + } + + caller_->CompleteOrNext(local_resp); +} + +bool ScanResumerImpl::Prepare(std::shared_ptr resp, int num_complete_rows) { + std::unique_lock mlock(mutex_); + if (state_ == ScanResumerState::kResumed) { + // user calls resume before we actually suspend the scan, just continue; + return false; + } + state_ = ScanResumerState::kSuspended; + resp_ = resp; + num_complete_rows_ = num_complete_rows; + + return true; +} + +ScanControllerImpl::ScanControllerImpl(std::shared_ptr caller) + : caller_(caller) {} + +std::shared_ptr ScanControllerImpl::Suspend() { + PreCheck(); + state_ = ScanControllerState::kSuspended; + resumer_ = std::make_shared(caller_); + return resumer_; +} + +void ScanControllerImpl::Terminate() { + PreCheck(); + state_ = ScanControllerState::kTerminated; +} + +// return the current state, and set the state to DESTROYED. +ScanControllerState ScanControllerImpl::Destroy() { + ScanControllerState state = state_; + state_ = ScanControllerState::kDestroyed; + return state; +} + +void ScanControllerImpl::PreCheck() { + CHECK(std::this_thread::get_id() == caller_thread_id_) + << "The current thread is" << std::this_thread::get_id() << ", expected thread is " + << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat"; + + CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state " + << DebugString(state_); +} + +std::string ScanControllerImpl::DebugString(ScanControllerState state) { + switch (state) { + case ScanControllerState::kInitialized: + return "kInitialized"; + case ScanControllerState::kSuspended: + return "kSuspended"; + case ScanControllerState::kTerminated: + return "kTerminated"; + case ScanControllerState::kDestroyed: + return "kDestroyed"; + default: + return "UNKNOWN"; + } +} + +std::string ScanControllerImpl::DebugString(ScanResumerState state) { + switch (state) { + case ScanResumerState::kInitialized: + return "kInitialized"; + case ScanResumerState::kSuspended: + return "kSuspended"; + case ScanResumerState::kResumed: + return "kResumed"; + default: + return "UNKNOWN"; + } +} + +AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller( + std::shared_ptr conn, std::shared_ptr retry_timer, + std::shared_ptr rpc_client, std::shared_ptr scan, int64_t scanner_id, + std::shared_ptr results_cache, std::shared_ptr consumer, + std::shared_ptr region_location, nanoseconds scanner_lease_timeout_nanos, + nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) + : conn_(conn), + retry_timer_(retry_timer), + rpc_client_(rpc_client), + scan_(scan), + scanner_id_(scanner_id), + results_cache_(results_cache), + consumer_(consumer), + region_location_(region_location), + scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos), + pause_(pause), + max_retries_(max_retries), + scan_timeout_nanos_(scan_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + controller_ = conn_->CreateRpcController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared>(); +} + +folly::Future AsyncScanRpcRetryingCaller::Start( + std::shared_ptr controller, + std::shared_ptr open_scan_resp, + const std::shared_ptr cell_scanner) { + OnComplete(controller, open_scan_resp, cell_scanner); + return promise_->getFuture(); +} + +void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr controller, + std::shared_ptr resp, + const std::shared_ptr cell_scanner) { + VLOG(5) << "Scan: OnComplete"; + + // TODO + // if (controller.failed()) { + // onError(controller.getFailed()); + // return; + // } + + bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message(); + + int64_t num_complete_rows_before = results_cache_->num_complete_rows(); + try { + auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner); + + auto results = results_cache_->AddAndGet(raw_results, is_heartbeat); + + auto scan_controller = std::make_shared(shared_from_this()); + + if (results.size() > 0) { + UpdateNextStartRowWhenError(*results[results.size() - 1]); + VLOG(5) << "Calling consumer->OnNext()"; + consumer_->OnNext(results, scan_controller); + } else if (is_heartbeat) { + consumer_->OnHeartbeat(scan_controller); + } + + ScanControllerState state = scan_controller->Destroy(); + if (state == ScanControllerState::kTerminated) { + if (resp->has_more_results_in_region() && !resp->more_results_in_region()) { + // we have more results in region but user request to stop the scan, so we need to close the + // scanner explicitly. + CloseScanner(); + } + CompleteNoMoreResults(); + return; + } + + int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before; + if (state == ScanControllerState::kSuspended) { + if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) { + return; + } + } + + } catch (const std::runtime_error& e) { + // We can not retry here. The server has responded normally and the call sequence has been + // increased so a new scan with the same call sequence will cause an + // OutOfOrderScannerNextException. Let the upper layer open a new scanner. + LOG(WARNING) << "Received exception in reading the scan response:" << e.what(); + CompleteWhenError(true); + return; + } + + CompleteOrNext(resp); +} + +void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr resp) { + VLOG(5) << "Scan: CompleteOrNext, response:" << resp->ShortDebugString(); + + if (resp->has_more_results() && !resp->more_results()) { + // RS tells us there is no more data for the whole scan + CompleteNoMoreResults(); + return; + } + // TODO: Implement Scan::limit(), and check the limit here + + if (resp->has_more_results_in_region() && !resp->more_results_in_region()) { + // TODO: check whether Scan is reversed here + CompleteWhenNoMoreResultsInRegion(); + return; + } + Next(); +} + +void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) { + VLOG(5) << "Scan: CompleteExceptionally"; + results_cache_->Clear(); + if (close_scanner) { + CloseScanner(); + } + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); +} + +void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() { + // In master code, scanners auto-close if we have exhausted the region. It may not be the case + // in branch-1 code. If this is backported, make sure that the scanner is closed. + VLOG(5) << "Scan: CompleteNoMoreResults"; + promise_->setValue(false); +} + +void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() { + VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion"; + // In master code, scanners auto-close if we have exhausted the region. It may not be the case + // in branch-1 code. If this is backported, make sure that the scanner is closed. + if (NoMoreResultsForScan(*scan_, region_location_->region_info())) { + CompleteNoMoreResults(); + } else { + CompleteWithNextStartRow(region_location_->region_info().end_key(), true); + } +} + +void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) { + VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region"; + scan_->SetStartRow(row); + // TODO: set inclusive if it is reverse scans + promise_->setValue(true); +} + +void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) { + next_start_row_when_error_ = optional(result.Row()); + include_next_start_row_when_error_ = result.Partial(); +} + +void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) { + VLOG(5) << "Scan: CompleteWhenError"; + results_cache_->Clear(); + if (close_scanner) { + CloseScanner(); + } + if (next_start_row_when_error_) { + // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement + // those options in Scan , we can start using that here. + scan_->SetStartRow(include_next_start_row_when_error_ + ? *next_start_row_when_error_ + : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_)); + } + promise_->setValue(true); +} + +bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan, + const pb::RegionInfo& info) { + if (BytesUtil::IsEmptyStopRow(info.end_key())) { + return true; + } + if (BytesUtil::IsEmptyStopRow(scan.StopRow())) { + return false; + } + int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow()); + // 1. if our stop row is less than the endKey of the region + // 2. if our stop row is equal to the endKey of the region and we do not include the stop row + // for scan. + return c > 0 || + (c == 0 /* && !scan.IncludeStopRow()*/); // TODO: Scans always exclude StopRow for now. +} + +void AsyncScanRpcRetryingCaller::Next() { + VLOG(5) << "Scan: Next"; + next_call_seq_++; + tries_ = 1; + exceptions_->clear(); + start_ns_ = TimeUtil::GetNowNanos(); + Call(); +} + +void AsyncScanRpcRetryingCaller::Call() { + VLOG(5) << "Scan: Call"; + auto self(shared_from_this()); + // As we have a call sequence for scan, it is useless to have a different rpc timeout which is + // less than the scan timeout. If the server does not respond in time(usually this will not + // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when + // resending the next request and the only way to fix this is to close the scanner and open a + // new one. + int64_t call_timeout_nanos; + if (scan_timeout_nanos_.count() > 0) { + int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); + if (remaining_nanos <= 0) { + CompleteExceptionally(true); + return; + } + call_timeout_nanos = remaining_nanos; + } else { + call_timeout_nanos = 0L; + } + + ResetController(controller_, call_timeout_nanos); + + auto req = + RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false); + + // TODO: create rpc controller + // do the RPC call + rpc_client_ + ->AsyncCall(region_location_->server_name().host_name(), + region_location_->server_name().port(), std::move(req), User::defaultUser(), + "ClientService") + .then([self, this](const std::unique_ptr& resp) { + auto scan_resp = std::static_pointer_cast(resp->resp_msg()); + return OnComplete(nullptr, scan_resp, resp->cell_scanner()); + }) + .onError( + [self, this](const folly::exception_wrapper& e) { /* TODO Handle errors OnError() */ }); +} + +void AsyncScanRpcRetryingCaller::CloseScanner() { + auto self(shared_from_this()); + ResetController(controller_, rpc_timeout_nanos_.count()); + + VLOG(5) << "Closing scanner with id:" << folly::to(scanner_id_); + + // Do a close scanner RPC. Fire and forget. + auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true); + rpc_client_ + ->AsyncCall(region_location_->server_name().host_name(), + region_location_->server_name().port(), std::move(req), User::defaultUser(), + "ClientService") + .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr { + LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() + + " for closing scanner id = " + folly::to(scanner_id_) + + " for " + region_location_->region_info().ShortDebugString() + + " failed, ignore, probably already closed. Exception:" + + e.what().toStdString(); + return nullptr; + }); +} + +void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr controller, + const int64_t& timeout_nanos) { + controller->Reset(); + if (timeout_nanos >= 0) { + controller->set_call_timeout( + milliseconds(std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_nanos)))); + } +} + +} // namespace hbase diff --git hbase-native-client/core/async-scan-rpc-retrying-caller.h hbase-native-client/core/async-scan-rpc-retrying-caller.h new file mode 100644 index 0000000..dfe45b2 --- /dev/null +++ hbase-native-client/core/async-scan-rpc-retrying-caller.h @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-connection.h" +#include "core/hbase-rpc-controller.h" +#include "core/raw-scan-result-consumer.h" +#include "core/region-location.h" +#include "core/request-converter.h" +#include "core/response-converter.h" +#include "core/result.h" +#include "core/scan.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "if/HBase.pb.h" +#include "scan-result-cache.h" +#include "utils/bytes-util.h" +#include "utils/connection-util.h" +#include "utils/optional.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +class AsyncScanRpcRetryingCaller; + +// The resume method is allowed to be called in another thread so here we also use the +// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back +// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, +// and when user calls resume method, we will change the state to RESUMED. But the resume method +// could be called in other thread, and in fact, user could just do this: +// controller.suspend().resume() +// This is strange but valid. This means the scan could be resumed before we call the prepare +// method to do the actual suspend work. So in the resume method, we will check if the state is +// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare +// method, if the state is RESUMED already, we will just return an let the scan go on. +// Notice that, the public methods of this class is supposed to be called by upper layer only, and +// package private methods can only be called within the implementation of +// AsyncScanSingleRegionRpcRetryingCaller. +// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread. +// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the +// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The +// application is expected to consumer the scan results before the scanner lease timeout. +class ScanResumerImpl : public ScanResumer { + public: + ScanResumerImpl(std::shared_ptr caller); + + virtual ~ScanResumerImpl() = default; + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + void Resume() override; + + // return false if the scan has already been resumed. See the comment above for ScanResumerImpl + // for more details. + bool Prepare(std::shared_ptr resp, int num_complete_rows); + + private: + // INITIALIZED -> SUSPENDED -> RESUMED + // INITIALIZED -> RESUMED + ScanResumerState state_ = ScanResumerState::kInitialized; + std::mutex mutex_; + std::shared_ptr resp_ = nullptr; + int64_t num_complete_rows_ = 0; + std::shared_ptr caller_; +}; + +class ScanControllerImpl : public ScanController { + public: + virtual ~ScanControllerImpl() = default; + + ScanControllerImpl(std::shared_ptr caller); + + /** + * Suspend the scan. + *

+ * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + std::shared_ptr Suspend(); + + /** + * Terminate the scan. + *

+ * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + void Terminate(); + + // return the current state, and set the state to DESTROYED. + ScanControllerState Destroy(); + + std::shared_ptr resumer() { return resumer_; } + + private: + void PreCheck(); + + std::string DebugString(ScanControllerState state); + + std::string DebugString(ScanResumerState state); + + private: + // Make sure the methods are only called in this thread. + std::thread::id caller_thread_id_ = std::this_thread::get_id(); + // INITIALIZED -> SUSPENDED -> DESTROYED + // INITIALIZED -> TERMINATED -> DESTROYED + // INITIALIZED -> DESTROYED + // If the state is incorrect we will throw IllegalStateException. + ScanControllerState state_ = ScanControllerState::kInitialized; + std::shared_ptr resumer_ = nullptr; + std::shared_ptr caller_; +}; + +class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this { + public: + AsyncScanRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr retry_timer, + std::shared_ptr rpc_client, + std::shared_ptr scan, int64_t scanner_id, + std::shared_ptr results_cache, + std::shared_ptr consumer, + std::shared_ptr region_location, + nanoseconds scanner_lease_timeout_nanos, nanoseconds pause, + uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); + + folly::Future Start(std::shared_ptr controller, + std::shared_ptr open_scan_resp, + const std::shared_ptr cell_scanner); + + private: + void OnComplete(std::shared_ptr controller, + std::shared_ptr resp, + const std::shared_ptr cell_scanner); + + void CompleteOrNext(std::shared_ptr resp); + + void CompleteExceptionally(bool close_scanner); + + void CompleteNoMoreResults(); + + void CompleteWhenNoMoreResultsInRegion(); + + void CompleteWithNextStartRow(std::string row, bool inclusive); + + void UpdateNextStartRowWhenError(const Result& result); + + void CompleteWhenError(bool close_scanner); + + bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info); + + void Next(); + + void Call(); + + void CloseScanner(); + + void ResetController(std::shared_ptr controller, + const int64_t& timeout_nanos); + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + std::shared_ptr rpc_client_; + std::shared_ptr scan_; + int64_t scanner_id_; + std::shared_ptr results_cache_; + std::shared_ptr consumer_; + std::shared_ptr region_location_; + nanoseconds scanner_lease_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + nanoseconds scan_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + optional next_start_row_when_error_ = optional(); + bool include_next_start_row_when_error_ = true; + uint64_t start_ns_; + uint32_t tries_; + std::shared_ptr> exceptions_; + uint32_t max_attempts_; + int64_t next_call_seq_ = -1L; + + friend class ScanResumerImpl; + friend class ScanControllerImpl; +}; + +} // namespace hbase diff --git hbase-native-client/core/async-table-result-scaner.cc hbase-native-client/core/async-table-result-scaner.cc new file mode 100644 index 0000000..432901d --- /dev/null +++ hbase-native-client/core/async-table-result-scaner.cc @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-table-result-scanner.h" + +namespace hbase { +AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size) + : max_cache_size_(max_cache_size) { + closed_ = false; + cache_size_ = 0; +} + +AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); } + +void AsyncTableResultScanner::Close() { + std::unique_lock mlock(mutex_); + closed_ = true; + while (!queue_.empty()) { + queue_.pop(); + } + cache_size_ = 0; + if (resumer_ != nullptr) { + resumer_->Resume(); + } + // TODO: reset queue, resume prefetch + cond_.notify_all(); +} + +std::shared_ptr AsyncTableResultScanner::Next() { + VLOG(5) << "AsyncTableResultScanner: Next() before mutex"; + + std::shared_ptr result = nullptr; + std::shared_ptr local_resumer = nullptr; + { + std::unique_lock mlock(mutex_); + VLOG(5) << "AsyncTableResultScanner: Next() after mutex"; + while (queue_.empty()) { + if (closed_) { + return nullptr; + } + if (error_) { + throw error_; + } + VLOG(5) << "AsyncTableResultScanner: Next() waiting for queue"; + cond_.wait(mlock); + } + VLOG(5) << "AsyncTableResultScanner: Next() wait finished"; + result = queue_.front(); + queue_.pop(); + + cache_size_ -= EstimatedSizeWithSharedPtr(result); + if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) { + VLOG(1) << std::this_thread::get_id() << " resume scan prefetching"; + local_resumer = resumer_; + resumer_ = nullptr; + } + } + + // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that + // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC + // in the same event thread before returning from the previous call. This seems like the + // wrong thing to do(™), but we cannot fix that now. Since the call back can end up calling + // this::OnNext(), we should unlock the mutex. + if (local_resumer != nullptr) { + local_resumer->Resume(); + } + VLOG(5) << "AsyncTableResultScanner: Next() - Returning"; + return result; +} + +void AsyncTableResultScanner::AddToCache(const std::vector> &results) { + VLOG(5) << "AsyncTableResultScanner: AddToCache()"; + for (const auto r : results) { + queue_.push(r); + cache_size_ += EstimatedSizeWithSharedPtr(r); + } +} + +template +inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr t) { + return t->EstimatedSize() + sizeof(std::shared_ptr); +} + +void AsyncTableResultScanner::OnNext(const std::vector> &results, + std::shared_ptr controller) { + VLOG(5) << "AsyncTableResultScanner: OnNext(), waiting for mutex"; + { + std::unique_lock mlock(mutex_); + VLOG(5) << "AsyncTableResultScanner: OnNext(), acquired mutex"; + if (closed_) { + controller->Terminate(); + return; + } + AddToCache(results); + + if (cache_size_ >= max_cache_size_) { + StopPrefetch(controller); + } + } + cond_.notify_all(); +} + +void AsyncTableResultScanner::StopPrefetch(std::shared_ptr controller) { + VLOG(1) << std::this_thread::get_id() + << ": stop prefetching when scanning as the cache size " + + folly::to(cache_size_) + " is greater than the max_cache_size " + + folly::to(max_cache_size_); + + resumer_ = controller->Suspend(); +} + +/** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ +void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr controller) { + std::unique_lock mlock(mutex_); + if (closed_) { + controller->Terminate(); + } +} + +/** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ +void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) { + std::unique_lock mlock(mutex_); + error_ = error; + cond_.notify_all(); +} + +/** + * Indicate that the scan operation is completed normally. + */ +void AsyncTableResultScanner::OnComplete() { + std::unique_lock mlock(mutex_); + closed_ = true; + cond_.notify_all(); +} +} // namespace hbase diff --git hbase-native-client/core/async-table-result-scanner.cc hbase-native-client/core/async-table-result-scanner.cc new file mode 100644 index 0000000..432901d --- /dev/null +++ hbase-native-client/core/async-table-result-scanner.cc @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-table-result-scanner.h" + +namespace hbase { +AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size) + : max_cache_size_(max_cache_size) { + closed_ = false; + cache_size_ = 0; +} + +AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); } + +void AsyncTableResultScanner::Close() { + std::unique_lock mlock(mutex_); + closed_ = true; + while (!queue_.empty()) { + queue_.pop(); + } + cache_size_ = 0; + if (resumer_ != nullptr) { + resumer_->Resume(); + } + // TODO: reset queue, resume prefetch + cond_.notify_all(); +} + +std::shared_ptr AsyncTableResultScanner::Next() { + VLOG(5) << "AsyncTableResultScanner: Next() before mutex"; + + std::shared_ptr result = nullptr; + std::shared_ptr local_resumer = nullptr; + { + std::unique_lock mlock(mutex_); + VLOG(5) << "AsyncTableResultScanner: Next() after mutex"; + while (queue_.empty()) { + if (closed_) { + return nullptr; + } + if (error_) { + throw error_; + } + VLOG(5) << "AsyncTableResultScanner: Next() waiting for queue"; + cond_.wait(mlock); + } + VLOG(5) << "AsyncTableResultScanner: Next() wait finished"; + result = queue_.front(); + queue_.pop(); + + cache_size_ -= EstimatedSizeWithSharedPtr(result); + if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) { + VLOG(1) << std::this_thread::get_id() << " resume scan prefetching"; + local_resumer = resumer_; + resumer_ = nullptr; + } + } + + // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that + // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC + // in the same event thread before returning from the previous call. This seems like the + // wrong thing to do(™), but we cannot fix that now. Since the call back can end up calling + // this::OnNext(), we should unlock the mutex. + if (local_resumer != nullptr) { + local_resumer->Resume(); + } + VLOG(5) << "AsyncTableResultScanner: Next() - Returning"; + return result; +} + +void AsyncTableResultScanner::AddToCache(const std::vector> &results) { + VLOG(5) << "AsyncTableResultScanner: AddToCache()"; + for (const auto r : results) { + queue_.push(r); + cache_size_ += EstimatedSizeWithSharedPtr(r); + } +} + +template +inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr t) { + return t->EstimatedSize() + sizeof(std::shared_ptr); +} + +void AsyncTableResultScanner::OnNext(const std::vector> &results, + std::shared_ptr controller) { + VLOG(5) << "AsyncTableResultScanner: OnNext(), waiting for mutex"; + { + std::unique_lock mlock(mutex_); + VLOG(5) << "AsyncTableResultScanner: OnNext(), acquired mutex"; + if (closed_) { + controller->Terminate(); + return; + } + AddToCache(results); + + if (cache_size_ >= max_cache_size_) { + StopPrefetch(controller); + } + } + cond_.notify_all(); +} + +void AsyncTableResultScanner::StopPrefetch(std::shared_ptr controller) { + VLOG(1) << std::this_thread::get_id() + << ": stop prefetching when scanning as the cache size " + + folly::to(cache_size_) + " is greater than the max_cache_size " + + folly::to(max_cache_size_); + + resumer_ = controller->Suspend(); +} + +/** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ +void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr controller) { + std::unique_lock mlock(mutex_); + if (closed_) { + controller->Terminate(); + } +} + +/** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ +void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) { + std::unique_lock mlock(mutex_); + error_ = error; + cond_.notify_all(); +} + +/** + * Indicate that the scan operation is completed normally. + */ +void AsyncTableResultScanner::OnComplete() { + std::unique_lock mlock(mutex_); + closed_ = true; + cond_.notify_all(); +} +} // namespace hbase diff --git hbase-native-client/core/async-table-result-scanner.h hbase-native-client/core/async-table-result-scanner.h new file mode 100644 index 0000000..a8a359d --- /dev/null +++ hbase-native-client/core/async-table-result-scanner.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "core/raw-scan-result-consumer.h" +#include "core/result-scanner.h" +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer { + public: + AsyncTableResultScanner(int64_t max_cache_size); + + virtual ~AsyncTableResultScanner(); + + void Close() override; + + std::shared_ptr Next() override; + + void OnNext(const std::vector> &results, + std::shared_ptr controller) override; + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ + void OnHeartbeat(std::shared_ptr controller) override; + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + void OnError(const folly::exception_wrapper &error) override; + + /** + * Indicate that the scan operation is completed normally. + */ + void OnComplete() override; + + private: + void AddToCache(const std::vector> &results); + + template + inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr t); + + void StopPrefetch(std::shared_ptr controller); + + private: + std::queue> queue_; + std::mutex mutex_; + std::condition_variable cond_; + folly::exception_wrapper error_; + int64_t cache_size_; + int64_t max_cache_size_; + bool closed_; + std::shared_ptr resumer_ = nullptr; +}; +} // namespace hbase diff --git hbase-native-client/core/cell-test.cc hbase-native-client/core/cell-test.cc index efb835d..4611473 100644 --- hbase-native-client/core/cell-test.cc +++ hbase-native-client/core/cell-test.cc @@ -169,3 +169,27 @@ TEST(CellTest, CellDebugString) { LOG(INFO) << cell2.DebugString(); EXPECT_EQ("row/column/42/DELETE/vlen=5/seqid=0", cell2.DebugString()); } + +TEST(CellTest, CellEstimatedSize) { + CellType cell_type = CellType::PUT; + int64_t timestamp = std::numeric_limits::max(); + + Cell empty{"a", "a", "", timestamp, "", cell_type}; + Cell cell1{"aa", "a", "", timestamp, "", cell_type}; + Cell cell2{"a", "aa", "", timestamp, "", cell_type}; + Cell cell3{"a", "a", "a", timestamp, "", cell_type}; + Cell cell4{"a", "a", "", timestamp, "a", cell_type}; + Cell cell5{"a", "a", "", timestamp, "a", CellType::DELETE}; + Cell cell6{"aaaaaa", "a", "", timestamp, "a", cell_type}; + + LOG(INFO) << empty.EstimatedSize(); + LOG(INFO) << cell1.EstimatedSize(); + + EXPECT_TRUE(empty.EstimatedSize() > sizeof(Cell)); + EXPECT_TRUE(cell1.EstimatedSize() > empty.EstimatedSize()); + EXPECT_EQ(cell1.EstimatedSize(), cell2.EstimatedSize()); + EXPECT_EQ(cell2.EstimatedSize(), cell3.EstimatedSize()); + EXPECT_EQ(cell3.EstimatedSize(), cell4.EstimatedSize()); + EXPECT_EQ(cell4.EstimatedSize(), cell5.EstimatedSize()); + EXPECT_TRUE(cell6.EstimatedSize() > cell1.EstimatedSize()); +} diff --git hbase-native-client/core/cell.cc hbase-native-client/core/cell.cc index 24788ab..e475d49 100644 --- hbase-native-client/core/cell.cc +++ hbase-native-client/core/cell.cc @@ -94,21 +94,30 @@ std::string Cell::DebugString() const { const char *Cell::TypeToString(CellType type) { switch (type) { - case MINIMUM: + case CellType::MINIMUM: return "MINIMUM"; - case PUT: + case CellType::PUT: return "PUT"; - case DELETE: + case CellType::DELETE: return "DELETE"; - case DELETE_COLUMN: + case CellType::DELETE_COLUMN: return "DELETE_COLUMN"; - case DELETE_FAMILY: + case CellType::DELETE_FAMILY: return "DELETE_FAMILY"; - case MAXIMUM: + case CellType::MAXIMUM: return "MAXIMUM"; default: return "UNKNOWN"; } } +size_t Cell::EstimatedSize() const { + size_t s = sizeof(Cell); + s += row_.capacity(); + s += family_.capacity(); + s += qualifier_.capacity(); + s += value_.capacity(); + return s; +} + } /* namespace hbase */ diff --git hbase-native-client/core/cell.h hbase-native-client/core/cell.h index acedd96..7a62a9b 100644 --- hbase-native-client/core/cell.h +++ hbase-native-client/core/cell.h @@ -24,7 +24,7 @@ namespace hbase { -enum CellType { +enum class CellType { MINIMUM = 0, PUT = 4, DELETE = 8, @@ -49,6 +49,9 @@ class Cell { CellType Type() const; int64_t SequenceId() const; std::string DebugString() const; + /** Returns estimated size of the Cell object including deep heap space usage + * of its data. Notice that this is a very rough estimate. */ + size_t EstimatedSize() const; private: std::string row_; diff --git hbase-native-client/core/hbase-configuration-loader.h hbase-native-client/core/hbase-configuration-loader.h index a1c1d3f..95b2541 100644 --- hbase-native-client/core/hbase-configuration-loader.h +++ hbase-native-client/core/hbase-configuration-loader.h @@ -24,15 +24,12 @@ #include #include -#include #include "core/configuration.h" +#include "utils/optional.h" namespace hbase { -template -using optional = std::experimental::optional; - class HBaseConfigurationLoader { public: HBaseConfigurationLoader(); diff --git hbase-native-client/core/meta-utils.cc hbase-native-client/core/meta-utils.cc index 119520f..36f52f1 100644 --- hbase-native-client/core/meta-utils.cc +++ hbase-native-client/core/meta-utils.cc @@ -94,19 +94,12 @@ std::shared_ptr MetaUtil::CreateLocation(const Response &resp) { throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + std::to_string(results.size())); } - auto result = *results[0]; - // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO - - std::shared_ptr region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN); - std::shared_ptr server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN); - if (region_info_str == nullptr) { - throw std::runtime_error("regioninfo column null for location"); - } - if (server_str == nullptr) { - throw std::runtime_error("server column null for location"); - } + auto region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN); + auto server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN); + CHECK(region_info_str); + CHECK(server_str); auto row = result.Row(); auto region_info = folly::to(*region_info_str); diff --git hbase-native-client/core/raw-async-table.cc hbase-native-client/core/raw-async-table.cc index 9e0d4a3..8a7059e 100644 --- hbase-native-client/core/raw-async-table.cc +++ hbase-native-client/core/raw-async-table.cc @@ -111,4 +111,24 @@ Future>>> RawAsyncTable::Batch( return caller->Call().then([caller](auto r) { return r; }); } + +void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr consumer) { + auto scanner = AsyncClientScanner::Create( + connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(), + connection_conf_->max_retries(), connection_conf_->scan_timeout(), + connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count()); + scanner->Start(); +} + +std::shared_ptr RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) { + // always create a new scan object as we may reset the start row later. + auto new_scan = std::make_shared(scan); + if (new_scan->Caching() <= 0) { + new_scan->SetCaching(default_scanner_caching_); + } + if (new_scan->MaxResultSize() <= 0) { + new_scan->SetMaxResultSize(default_scanner_max_result_size_); + } + return new_scan; +} } // namespace hbase diff --git hbase-native-client/core/raw-async-table.h hbase-native-client/core/raw-async-table.h index e26d46e..2e14755 100644 --- hbase-native-client/core/raw-async-table.h +++ hbase-native-client/core/raw-async-table.h @@ -24,7 +24,9 @@ #include #include #include + #include "core/async-batch-rpc-retrying-caller.h" +#include "core/async-client-scanner.h" #include "core/async-connection.h" #include "core/async-rpc-retrying-caller-factory.h" #include "core/async-rpc-retrying-caller.h" @@ -32,6 +34,7 @@ #include "core/get.h" #include "core/put.h" #include "core/result.h" +#include "core/scan.h" using folly::Future; using folly::Try; @@ -52,12 +55,17 @@ class RawAsyncTable { : connection_(connection), connection_conf_(connection->connection_conf()), table_name_(table_name), - rpc_client_(connection->rpc_client()) {} + rpc_client_(connection->rpc_client()) { + default_scanner_caching_ = connection_conf_->scanner_caching(); + default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size(); + } virtual ~RawAsyncTable() = default; Future> Get(const hbase::Get& get); Future Put(const hbase::Put& put); + + void Scan(const hbase::Scan& scan, std::shared_ptr consumer); void Close() {} Future>>> Get(const std::vector& gets); @@ -69,6 +77,8 @@ class RawAsyncTable { std::shared_ptr connection_conf_; std::shared_ptr table_name_; std::shared_ptr rpc_client_; + int32_t default_scanner_caching_; + int64_t default_scanner_max_result_size_; /* Methods */ template @@ -81,5 +91,7 @@ class RawAsyncTable { template std::shared_ptr> CreateCallerBuilder(std::string row, nanoseconds rpc_timeout); + + std::shared_ptr SetDefaultScanConfig(const hbase::Scan& scan); }; } // namespace hbase diff --git hbase-native-client/core/raw-scan-result-consumer.h hbase-native-client/core/raw-scan-result-consumer.h new file mode 100644 index 0000000..01fd625 --- /dev/null +++ hbase-native-client/core/raw-scan-result-consumer.h @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed }; + +enum class ScanResumerState { kInitialized, kSuspended, kResumed }; + +/** + * Used to resume a scan. + */ +class ScanResumer { + public: + virtual ~ScanResumer() = default; + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + virtual void Resume() = 0; +}; + +/** + * Used to suspend or stop a scan. + *

+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A + * IllegalStateException will be thrown if you call them at other places. + *

+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are + * free to not call them both), and the methods are not reentrant. A IllegalStateException will be + * thrown if you have already called one of the methods. + */ +class ScanController { + public: + virtual ~ScanController() = default; + + /** + * Suspend the scan. + *

+ * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + virtual std::shared_ptr Suspend() = 0; + + /** + * Terminate the scan. + *

+ * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + virtual void Terminate() = 0; +}; + +/** + * Receives {@link Result} for an asynchronous scan. + *

+ * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread + * which we send request to HBase service. So if you want the asynchronous scanner fetch data from + * HBase in background while you process the returned data, you need to move the processing work to + * another thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +class RawScanResultConsumer { + public: + virtual ~RawScanResultConsumer() = default; + + /** + * Indicate that we have receive some data. + * @param results the data fetched from HBase service. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within scope of onNext method. You can only call its method in + * onNext, do NOT store it and call it later outside onNext. + */ + virtual void OnNext(const std::vector> &results, + std::shared_ptr controller) {} + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ + virtual void OnHeartbeat(std::shared_ptr controller) {} + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + virtual void OnError(const folly::exception_wrapper &error) {} + + /** + * Indicate that the scan operation is completed normally. + */ + virtual void OnComplete() {} +}; +} // namespace hbase diff --git hbase-native-client/core/region-location.h hbase-native-client/core/region-location.h index 4087d94..d5d9d67 100644 --- hbase-native-client/core/region-location.h +++ hbase-native-client/core/region-location.h @@ -26,7 +26,7 @@ namespace hbase { -enum RegionLocateType { kBefore, kCurrent, kAfter }; +enum class RegionLocateType { kBefore, kCurrent, kAfter }; /** * @brief class to hold where a region is located. diff --git hbase-native-client/core/request-converter.cc hbase-native-client/core/request-converter.cc index c90e1ab..0ec3039 100644 --- hbase-native-client/core/request-converter.cc +++ hbase-native-client/core/request-converter.cc @@ -53,15 +53,8 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, return pb_req; } -std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, - const std::string ®ion_name) { - auto pb_req = Request::scan(); - - auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); - - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_scan = pb_msg->mutable_scan(); +std::unique_ptr RequestConverter::ToScan(const Scan &scan) { + auto pb_scan = std::make_unique(); pb_scan->set_max_versions(scan.MaxVersions()); pb_scan->set_cache_blocks(scan.CacheBlocks()); pb_scan->set_reversed(scan.IsReversed()); @@ -94,11 +87,85 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release()); } + return std::move(pb_scan); +} + +std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, + const std::string ®ion_name) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_scan(ToScan(scan).release()); + + // TODO We will change this later. + pb_msg->set_client_handles_partials(false); + pb_msg->set_client_handles_heartbeats(false); + pb_msg->set_track_scan_metrics(false); + + return pb_req; +} + +std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, + const std::string ®ion_name, + int32_t num_rows, bool close_scanner) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_scan(ToScan(scan).release()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + + // TODO We will change this later. + pb_msg->set_client_handles_partials(false); + pb_msg->set_client_handles_heartbeats(false); + pb_msg->set_track_scan_metrics(false); + + return pb_req; +} + +std::unique_ptr RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + pb_msg->set_scanner_id(scanner_id); + // TODO We will change this later. pb_msg->set_client_handles_partials(false); pb_msg->set_client_handles_heartbeats(false); pb_msg->set_track_scan_metrics(false); + // TODO: set scan limit + + return pb_req; +} + +std::unique_ptr RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner, + int64_t next_call_seq_id, bool renew) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + pb_msg->set_scanner_id(scanner_id); + pb_msg->set_next_call_seq(next_call_seq_id); + + // TODO We will change this later. + pb_msg->set_client_handles_partials(false); + pb_msg->set_client_handles_heartbeats(false); + pb_msg->set_track_scan_metrics(false); + pb_msg->set_renew(renew); + + // TODO: set scan limit + return pb_req; } @@ -190,13 +257,13 @@ std::unique_ptr RequestConverter::ToMutation(const MutationType t DeleteType RequestConverter::ToDeleteType(const CellType type) { switch (type) { - case DELETE: + case CellType::DELETE: return pb::MutationProto_DeleteType_DELETE_ONE_VERSION; - case DELETE_COLUMN: + case CellType::DELETE_COLUMN: return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS; - case DELETE_FAMILY: + case CellType::DELETE_FAMILY: return pb::MutationProto_DeleteType_DELETE_FAMILY; - case DELETE_FAMILY_VERSION: + case CellType::DELETE_FAMILY_VERSION: return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION; default: throw std::runtime_error("Unknown delete type: " + folly::to(type)); diff --git hbase-native-client/core/request-converter.h hbase-native-client/core/request-converter.h index 6861604..d64c5e7 100644 --- hbase-native-client/core/request-converter.h +++ hbase-native-client/core/request-converter.h @@ -69,6 +69,16 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name, + int32_t num_rows, bool close_scanner); + + static std::unique_ptr ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner); + + static std::unique_ptr ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner, int64_t next_call_seq_id, + bool renew); + static std::unique_ptr ToMultiRequest(const ActionsByRegion ®ion_requests); static std::unique_ptr ToMutateRequest(const Put &put, const std::string ®ion_name); @@ -88,6 +98,7 @@ class RequestConverter { */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); static std::unique_ptr ToGet(const Get &get); + static std::unique_ptr ToScan(const Scan &scan); static DeleteType ToDeleteType(const CellType type); static bool IsDelete(const CellType type); }; diff --git hbase-native-client/core/response-converter.cc hbase-native-client/core/response-converter.cc index b29a819..533c2b2 100644 --- hbase-native-client/core/response-converter.cc +++ hbase-native-client/core/response-converter.cc @@ -47,7 +47,7 @@ std::shared_ptr ResponseConverter::FromGetResponse(const Response& resp) } std::shared_ptr ResponseConverter::ToResult( - const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { + const hbase::pb::Result& result, const std::shared_ptr cell_scanner) { std::vector> vcells; for (auto cell : result.cell()) { std::shared_ptr pcell = @@ -75,34 +75,38 @@ std::shared_ptr ResponseConverter::ToResult( std::vector> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast(resp.resp_msg()); - VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); - int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() - : scan_resp->results_size(); + return FromScanResponse(scan_resp, resp.cell_scanner()); +} + +std::vector> ResponseConverter::FromScanResponse( + const std::shared_ptr scan_resp, std::shared_ptr cell_scanner) { + VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString() + << " cell_scanner:" << (cell_scanner == nullptr); + int num_results = + cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); std::vector> results{static_cast(num_results)}; for (int i = 0; i < num_results; i++) { - if (resp.cell_scanner() != nullptr) { + if (cell_scanner != nullptr) { // Cells are out in cellblocks. Group them up again as Results. How many to read at a // time will be found in getCellsLength -- length here is how many Cells in the i'th Result int num_cells = scan_resp->cells_per_result(i); std::vector> vcells; - while (resp.cell_scanner()->Advance()) { - vcells.push_back(resp.cell_scanner()->Current()); - } - // TODO: check associated cell count? - - if (vcells.size() != num_cells) { - std::string msg = "Results sent from server=" + std::to_string(num_results) + - ". But only got " + std::to_string(i) + - " results completely at client. Resetting the scanner to scan again."; - LOG(ERROR) << msg; - throw std::runtime_error(msg); + for (int j = 0; j < num_cells; j++) { + if (!cell_scanner->Advance()) { + std::string msg = "Results sent from server=" + std::to_string(num_results) + + ". But only got " + std::to_string(i) + + " results completely at client. Resetting the scanner to scan again."; + LOG(ERROR) << msg; + throw std::runtime_error(msg); + } + vcells.push_back(cell_scanner->Current()); } // TODO: handle partial results per Result by checking partial_flag_per_result results[i] = std::make_shared(vcells, false, scan_resp->stale(), false); } else { - results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); + results[i] = ToResult(scan_resp->results(i), cell_scanner); } } diff --git hbase-native-client/core/response-converter.h hbase-native-client/core/response-converter.h index a5095fd..4c37ff5 100644 --- hbase-native-client/core/response-converter.h +++ hbase-native-client/core/response-converter.h @@ -41,7 +41,7 @@ class ResponseConverter { ~ResponseConverter(); static std::shared_ptr ToResult(const hbase::pb::Result& result, - const std::unique_ptr& cell_scanner); + const std::shared_ptr cell_scanner); /** * @brief Returns a Result object created by PB Message in passed Response object. @@ -51,6 +51,9 @@ class ResponseConverter { static std::vector> FromScanResponse(const Response& resp); + static std::vector> FromScanResponse( + const std::shared_ptr resp, std::shared_ptr cell_scanner); + static std::unique_ptr GetResults(std::shared_ptr req, const Response& resp); diff --git hbase-native-client/core/result-scanner.h hbase-native-client/core/result-scanner.h new file mode 100644 index 0000000..9460521 --- /dev/null +++ hbase-native-client/core/result-scanner.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "core/cell.h" +#include "core/result.h" + +namespace hbase { + +/** + * Interface for client-side scanning. Use Table to obtain instances. + */ +class ResultScanner { + // TODO: should we implement forward iterators? + + public: + virtual ~ResultScanner() {} + + virtual void Close() = 0; + + virtual std::shared_ptr Next() = 0; +}; +} /* namespace hbase */ diff --git hbase-native-client/core/result-test.cc hbase-native-client/core/result-test.cc index 520f4b9..8ed7a0d 100644 --- hbase-native-client/core/result-test.cc +++ hbase-native-client/core/result-test.cc @@ -17,6 +17,7 @@ * */ +#include #include #include #include @@ -297,3 +298,24 @@ TEST(Result, FilledResult) { EXPECT_EQ("value-9", qual_val_map.second); } } + +TEST(Result, ResultEstimatedSize) { + CellType cell_type = CellType::PUT; + int64_t timestamp = std::numeric_limits::max(); + std::vector > cells; + Result empty(cells, true, false, false); + + EXPECT_EQ(empty.EstimatedSize(), sizeof(Result)); + + cells.push_back(std::make_shared("a", "a", "", timestamp, "", cell_type)); + Result result1(cells, true, false, false); + EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize()); + + cells.push_back(std::make_shared("a", "a", "", timestamp, "", cell_type)); + Result result2(cells, true, false, false); + EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize()); + + LOG(INFO) << empty.EstimatedSize(); + LOG(INFO) << result1.EstimatedSize(); + LOG(INFO) << result2.EstimatedSize(); +} diff --git hbase-native-client/core/result.cc hbase-native-client/core/result.cc index 9d9ddb3..44b4c86 100644 --- hbase-native-client/core/result.cc +++ hbase-native-client/core/result.cc @@ -25,13 +25,7 @@ Result::~Result() {} Result::Result(const std::vector > &cells, bool exists, bool stale, bool partial) - : exists_(exists), stale_(stale), partial_(partial) { - for (const auto &cell : cells) { - cells_.push_back(cell); - // We create the map when cells are added. unlike java where map is created - // when result.getMap() is called - result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); - } + : exists_(exists), stale_(stale), partial_(partial), cells_(cells) { row_ = (cells_.size() == 0 ? "" : cells_[0]->Row()); } @@ -43,10 +37,10 @@ Result::Result(const Result &result) { if (!result.cells_.empty()) { for (const auto &cell : result.cells_) { cells_.push_back(cell); - result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); } } } + const std::vector > &Result::Cells() const { return cells_; } std::vector > Result::ColumnCells(const std::string &family, @@ -74,13 +68,12 @@ const std::shared_ptr Result::ColumnLatestCell(const std::string &family, return nullptr; } -std::shared_ptr Result::Value(const std::string &family, - const std::string &qualifier) const { +optional Result::Value(const std::string &family, const std::string &qualifier) const { std::shared_ptr latest_cell(ColumnLatestCell(family, qualifier)); if (latest_cell.get()) { - return std::make_shared(latest_cell->Value()); + return optional(latest_cell->Value()); } - return nullptr; + return optional(); } bool Result::IsEmpty() const { return cells_.empty(); } @@ -89,24 +82,33 @@ const std::string &Result::Row() const { return row_; } int Result::Size() const { return cells_.size(); } -const ResultMap &Result::Map() const { return result_map_; } +ResultMap Result::Map() const { + ResultMap result_map; + for (const auto &cell : cells_) { + result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); + } + return result_map; +} -const std::map Result::FamilyMap(const std::string &family) const { +std::map Result::FamilyMap(const std::string &family) const { std::map family_map; if (!IsEmpty()) { - for (auto itr = result_map_.begin(); itr != result_map_.end(); ++itr) { - if (family == itr->first) { - for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) { - for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) { - // We break after inserting the first value. Result.java takes only - // the first value - family_map[qitr->first] = vitr->second; - break; - } - } + auto result_map = Map(); + auto itr = result_map.find(family); + if (itr == result_map.end()) { + return family_map; + } + + for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) { + for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) { + // We break after inserting the first value. Result.java takes only + // the first value + family_map[qitr->first] = vitr->second; + break; } } } + return family_map; } @@ -131,4 +133,14 @@ std::string Result::DebugString() const { return ret; } +size_t Result::EstimatedSize() const { + size_t s = sizeof(Result); + s += row_.capacity(); + for (const auto c : cells_) { + s += sizeof(std::shared_ptr); + s + c->EstimatedSize(); + } + return s; +} + } /* namespace hbase */ diff --git hbase-native-client/core/result.h hbase-native-client/core/result.h index 627d161..f18071b 100644 --- hbase-native-client/core/result.h +++ hbase-native-client/core/result.h @@ -26,6 +26,7 @@ #include #include "core/cell.h" +#include "utils/optional.h" namespace hbase { @@ -79,7 +80,7 @@ class Result { * @param family - column family * @param qualifier - column qualifier */ - std::shared_ptr Value(const std::string &family, const std::string &qualifier) const; + optional Value(const std::string &family, const std::string &qualifier) const; /** * @brief Returns if the underlying Cell vector is empty or not @@ -104,23 +105,32 @@ class Result { * All other map returning methods make use of this map internally * The Map is created when the Result instance is created */ - const ResultMap &Map() const; + ResultMap Map() const; /** * @brief Map of qualifiers to values. * Returns a Map of the form: Map * @param family - column family to get */ - const std::map FamilyMap(const std::string &family) const; + std::map FamilyMap(const std::string &family) const; std::string DebugString() const; + bool Exists() const { return exists_; } + + bool Stale() const { return stale_; } + + bool Partial() const { return partial_; } + + /** Returns estimated size of the Result object including deep heap space usage + * of its Cells and data. Notice that this is a very rough estimate. */ + size_t EstimatedSize() const; + private: bool exists_ = false; bool stale_ = false; bool partial_ = false; std::string row_ = ""; std::vector > cells_; - ResultMap result_map_; }; } /* namespace hbase */ diff --git hbase-native-client/core/scan-result-cache-test.cc hbase-native-client/core/scan-result-cache-test.cc new file mode 100644 index 0000000..d27ab27 --- /dev/null +++ hbase-native-client/core/scan-result-cache-test.cc @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include + +#include "core/cell.h" +#include "core/result.h" +#include "core/scan-result-cache.h" + +using hbase::ScanResultCache; +using hbase::Result; +using hbase::Cell; +using hbase::CellType; + +using ResultVector = std::vector>; + +std::shared_ptr CreateCell(const int32_t &key, const std::string &family, + const std::string &column) { + auto row = folly::to(key); + return std::make_shared(row, family, column, std::numeric_limits::max(), row, + CellType::PUT); +} + +std::shared_ptr CreateResult(std::shared_ptr cell, bool partial) { + return std::make_shared(std::vector>{cell}, false, false, partial); +} + +TEST(ScanResultCacheTest, NoPartial) { + ScanResultCache cache; + ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false)); + ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true)); + int32_t count = 10; + ResultVector results{}; + for (int32_t i = 0; i < count; i++) { + results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false)); + } + ASSERT_EQ(results, cache.AddAndGet(results, false)); +} + +TEST(ScanResultCacheTest, Combine1) { + ScanResultCache cache; + auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true); + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); + auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false); + ASSERT_EQ(1L, results.size()); + ASSERT_EQ(prev_result, results[0]); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size()); + + results = cache.AddAndGet(ResultVector{}, false); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to(results[0]->Row())); + ASSERT_EQ(3, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq3"))); +} + +TEST(ScanResultCacheTest, Combine2) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); + + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); + auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); + + auto results = cache.AddAndGet(ResultVector{next_result1}, false); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to(results[0]->Row())); + ASSERT_EQ(3, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq3"))); + + results = cache.AddAndGet(ResultVector{next_to_next_result1}, false); + ASSERT_EQ(2, results.size()); + ASSERT_EQ(2, folly::to(results[0]->Row())); + ASSERT_EQ(1, results[0]->Cells().size()); + ASSERT_EQ(2, folly::to(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(3, folly::to(results[1]->Row())); + ASSERT_EQ(1, results[1]->Cells().size()); + ASSERT_EQ(3, folly::to(*results[1]->Value("cf", "cq2"))); +} + +TEST(ScanResultCacheTest, Combine3) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false); + auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + + auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false); + + ASSERT_EQ(2, results.size()); + ASSERT_EQ(1, folly::to(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(2, folly::to(results[1]->Row())); + ASSERT_EQ(1, results[1]->Cells().size()); + ASSERT_EQ(2, folly::to(*results[1]->Value("cf", "cq1"))); + + results = cache.AddAndGet(ResultVector{}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(3, folly::to(results[0]->Row())); + ASSERT_EQ(1, results[0]->Cells().size()); + ASSERT_EQ(3, folly::to(*results[0]->Value("cf", "cq1"))); +} + +TEST(ScanResultCacheTest, Combine4) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false); + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); + auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + + auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to(*results[0]->Value("cf", "cq2"))); + + results = cache.AddAndGet(ResultVector{next_result2}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(2, folly::to(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(2, folly::to(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(2, folly::to(*results[0]->Value("cf", "cq2"))); +} + +TEST(ScanResultCacheTest, SizeOf) { + std::string e{""}; + std::string f{"f"}; + std::string foo{"foo"}; + + LOG(INFO) << sizeof(e) << " " << e.capacity(); + LOG(INFO) << sizeof(f) << " " << f.capacity(); + LOG(INFO) << sizeof(foo) << " " << foo.capacity(); +} diff --git hbase-native-client/core/scan-result-cache.cc hbase-native-client/core/scan-result-cache.cc new file mode 100644 index 0000000..ab7ba8b --- /dev/null +++ hbase-native-client/core/scan-result-cache.cc @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "scan-result-cache.h" +#include +#include +#include +#include + +namespace hbase { +/** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ +std::vector> ScanResultCache::AddAndGet( + const std::vector> &results, bool is_hearthbeat) { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.empty()) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partial_results_.empty() && !is_hearthbeat) { + return UpdateNumberOfCompleteResultsAndReturn( + std::vector>{Combine()}); + } + return std::vector>{}; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + auto last = results[results.size() - 1]; + if (last->Partial()) { + if (partial_results_.empty()) { + partial_results_.push_back(last); + std::vector> new_results; + std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results)); + return UpdateNumberOfCompleteResultsAndReturn(new_results); + } + // We have only one result and it is partial + if (results.size() == 1) { + // check if there is a row change + if (partial_results_.at(0)->Row() == last->Row()) { + partial_results_.push_back(last); + return std::vector>{}; + } + auto complete_result = Combine(); + partial_results_.push_back(last); + return UpdateNumberOfCompleteResultsAndReturn(complete_result); + } + // We have some complete results + auto results_to_return = PrependCombined(results, results.size() - 1); + partial_results_.push_back(last); + return UpdateNumberOfCompleteResultsAndReturn(results_to_return); + } + if (!partial_results_.empty()) { + return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size())); + } + return UpdateNumberOfCompleteResultsAndReturn(results); +} + +void ScanResultCache::Clear() { partial_results_.clear(); } + +std::shared_ptr ScanResultCache::CreateCompleteResult( + const std::vector> &partial_results) { + if (partial_results.empty()) { + return std::make_shared(std::vector>{}, false, false, false); + } + std::vector> cells{}; + bool stale = false; + std::string prev_row = ""; + std::string current_row = ""; + size_t i = 0; + for (const auto &r : partial_results) { + current_row = r->Row(); + if (i != 0 && prev_row != current_row) { + throw new std::runtime_error( + "Cannot form complete result. Rows of partial results do not match."); + } + // Ensure that all Results except the last one are marked as partials. The last result + // may not be marked as a partial because Results are only marked as partials when + // the scan on the server side must be stopped due to reaching the maxResultSize. + // Visualizing it makes it easier to understand: + // maxResultSize: 2 cells + // (-x-) represents cell number x in a row + // Example: row1: -1- -2- -3- -4- -5- (5 cells total) + // How row1 will be returned by the server as partial Results: + // Result1: -1- -2- (2 cells, size limit reached, mark as partial) + // Result2: -3- -4- (2 cells, size limit reached, mark as partial) + // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) + if (i != partial_results.size() - 1 && !r->Partial()) { + throw new std::runtime_error("Cannot form complete result. Result is missing partial flag."); + } + prev_row = current_row; + stale = stale || r->Stale(); + for (const auto &c : r->Cells()) { + cells.push_back(c); + } + i++; + } + + return std::make_shared(cells, false, stale, false); +} + +std::shared_ptr ScanResultCache::Combine() { + auto result = CreateCompleteResult(partial_results_); + partial_results_.clear(); + return result; +} + +std::vector> ScanResultCache::PrependCombined( + const std::vector> &results, int length) { + if (length == 0) { + return std::vector>{Combine()}; + } + // the last part of a partial result may not be marked as partial so here we need to check if + // there is a row change. + size_t start; + if (partial_results_[0]->Row() == results[0]->Row()) { + partial_results_.push_back(results[0]); + start = 1; + length--; + } else { + start = 0; + } + std::vector> prepend_results{}; + prepend_results.push_back(Combine()); + std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results)); + return prepend_results; +} + +std::vector> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( + const std::shared_ptr &result) { + return UpdateNumberOfCompleteResultsAndReturn(std::vector>{result}); +} + +std::vector> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( + const std::vector> &results) { + num_complete_rows_ += results.size(); + return results; +} +} // namespace hbase diff --git hbase-native-client/core/scan-result-cache.h hbase-native-client/core/scan-result-cache.h new file mode 100644 index 0000000..5d3d0ab --- /dev/null +++ hbase-native-client/core/scan-result-cache.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +class ScanResultCache { + // In Java, there are 3 different implementations for this. We are not doing partial results, + // or scan batching in native code for now, so this version is simpler and + // only deals with giving back complete rows as Result. It is more or less implementation + // of CompleteScanResultCache.java + + public: + /** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ + std::vector> AddAndGet( + const std::vector> &results, bool is_hearthbeat); + + void Clear(); + + int64_t num_complete_rows() const { return num_complete_rows_; } + + private: + /** + * Forms a single result from the partial results in the partialResults list. This method is + * useful for reconstructing partial results on the client side. + * @param partial_results list of partial results + * @return The complete result that is formed by combining all of the partial results together + */ + static std::shared_ptr CreateCompleteResult( + const std::vector> &partial_results); + + std::shared_ptr Combine(); + + std::vector> PrependCombined( + const std::vector> &results, int length); + + std::vector> UpdateNumberOfCompleteResultsAndReturn( + const std::shared_ptr &result); + + std::vector> UpdateNumberOfCompleteResultsAndReturn( + const std::vector> &results); + + private: + std::vector> partial_results_; + int64_t num_complete_rows_ = 0; +}; +} // namespace hbase diff --git hbase-native-client/core/scan-results-cache.cc hbase-native-client/core/scan-results-cache.cc new file mode 100644 index 0000000..78c3ec7 --- /dev/null +++ hbase-native-client/core/scan-results-cache.cc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/scan-results-cache.h" + +#include +#include +#include +#include + +namespace hbase {} // namespace hbase diff --git hbase-native-client/core/scan-results-cache.h hbase-native-client/core/scan-results-cache.h new file mode 100644 index 0000000..f08bf36 --- /dev/null +++ hbase-native-client/core/scan-results-cache.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +class ScanResultsCache { + // In Java, there are 3 different implementations for this. We are not doing + // partial results in native code for now, so this version is simpler and + // only deals with giving back complete rows as Result. + + /** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ + std::vector> AddAndGet(std::vector> results, + bool is_hearthbeat) { + return nullptr; + } + + public: + void Clear() {} +}; +} // namespace hbase diff --git hbase-native-client/core/scan.h hbase-native-client/core/scan.h index fb302b7..0ddf79c 100644 --- hbase-native-client/core/scan.h +++ hbase-native-client/core/scan.h @@ -258,7 +258,7 @@ class Scan : public Query { std::string start_row_ = ""; std::string stop_row_ = ""; uint32_t max_versions_ = 1; - int caching_ = -1; + int32_t caching_ = -1; int64_t max_result_size_ = -1; bool cache_blocks_ = true; bool load_column_families_on_demand_ = false; diff --git hbase-native-client/core/scanner-test.cc hbase-native-client/core/scanner-test.cc new file mode 100644 index 0000000..77dad10 --- /dev/null +++ hbase-native-client/core/scanner-test.cc @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include + +#include "core/async-client-scanner.h" +#include "core/cell.h" +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/hbase-configuration-loader.h" +#include "core/put.h" +#include "core/result.h" +#include "core/row.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" +#include "utils/time-util.h" + +using hbase::Cell; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Scan; +using hbase::Table; +using hbase::TestUtil; +using hbase::TimeUtil; +using hbase::AsyncClientScanner; +using namespace std::chrono_literals; + +class ScannerTest : public ::testing::Test { + public: + static std::unique_ptr test_util; + + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique(); + test_util->StartMiniCluster(2); + } +}; +std::unique_ptr ScannerTest::test_util = nullptr; + +std::unique_ptr MakePut(const std::string &row) { + auto put = std::make_unique(row); + put->AddColumn("f", "q", row); + return std::move(put); +} + +std::string Row(const std::string &prefix, uint64_t i) { + std::ostringstream s; + s.fill('0'); + s.width(3); + s << i; + return prefix + s.str(); +} + +void CheckResult(const Result &r, uint64_t i) { + LOG(INFO) << r.DebugString(); + auto row = r.Row(); + ASSERT_EQ(row, Row("row", i)); + ASSERT_EQ(r.Cells().size(), 1); + ASSERT_EQ(*r.Value("f", "q"), row); +} + +std::unique_ptr CreateTableAndWriteData(std::string table_name, uint64_t num_rows, + int32_t num_regions) { + if (num_regions <= 1) { + ScannerTest::test_util->CreateTable(table_name, "f"); + } else { + std::vector keys; + for (int32_t i = 0; i < num_regions - 1; i++) { + keys.push_back(Row("row", i * (num_rows / (num_regions - 1)))); + } + ScannerTest::test_util->CreateTable(table_name, "f", keys); + } + auto tn = folly::to(table_name); + auto client = std::make_unique(*ScannerTest::test_util->conf()); + auto table = client->Table(tn); + + // Perform Puts + for (uint64_t i = 0; i < num_rows; i++) { + table->Put(*MakePut(Row("row", i))); + } + return std::move(client); +} + +TEST_F(ScannerTest, BasicScan) { + uint64_t num_rows = 1000; + auto client = CreateTableAndWriteData("t_basic_scan", num_rows, 1); + auto table = client->Table(folly::to("t_basic_scan")); + + VLOG(1) << "Starting scan for the test"; + Scan scan{}; + scan.SetCaching(100); + table->Scan(scan); + + // TODO remove sleep + std::this_thread::sleep_for(2s); + // TODO: assert results +} + +TEST_F(ScannerTest, MultiRegionScan) { + uint64_t num_rows = 1000; + auto client = CreateTableAndWriteData("t_multi_region_scan", num_rows, 5); + auto table = client->Table(folly::to("t_multi_region_scan")); + + VLOG(1) << "Starting scan for the test"; + Scan scan{}; + scan.SetCaching(100); + auto scanner = table->Scan(scan); + + uint64_t i = 0; + auto r = scanner->Next(); + while (r != nullptr) { + CheckResult(*r, i++); + r = scanner->Next(); + } + ASSERT_EQ(i, num_rows); +} + +TEST_F(ScannerTest, ScanWithPauses) { + ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100); + uint64_t num_rows = 1000; + auto client = CreateTableAndWriteData("t_multi_region_scan", num_rows, 5); + auto table = client->Table(folly::to("t_multi_region_scan")); + + VLOG(1) << "Starting scan for the test"; + Scan scan{}; + scan.SetCaching(100); + auto scanner = table->Scan(scan); + + uint64_t i = 0; + auto r = scanner->Next(); + while (r != nullptr) { + CheckResult(*r, i++); + r = scanner->Next(); + std::this_thread::sleep_for(TimeUtil::MillisToNanos(10)); + } + // TODO: make sure that there were pauses. + ASSERT_EQ(i, num_rows); + // TODO: restore scan parameter +} diff --git hbase-native-client/core/simple-client.cc hbase-native-client/core/simple-client.cc index 3a7d62b..f79d848 100644 --- hbase-native-client/core/simple-client.cc +++ hbase-native-client/core/simple-client.cc @@ -30,6 +30,7 @@ #include "core/client.h" #include "core/get.h" #include "core/put.h" +#include "core/scan.h" #include "core/table.h" #include "serde/server-name.h" #include "serde/table-name.h" @@ -38,6 +39,7 @@ using hbase::Client; using hbase::Configuration; using hbase::Get; +using hbase::Scan; using hbase::Put; using hbase::Table; using hbase::pb::TableName; @@ -48,6 +50,10 @@ DEFINE_string(table, "test_table", "What table to do the reads or writes"); DEFINE_string(row, "row_", "row prefix"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); DEFINE_uint64(num_rows, 10000, "How many rows to write and read"); +DEFINE_bool(puts, true, "Whether to perform puts"); +DEFINE_bool(gets, true, "Whether to perform gets"); +DEFINE_bool(multigets, true, "Whether to perform multi-gets"); +DEFINE_bool(scans, true, "Whether to perform scans"); DEFINE_bool(display_results, false, "Whether to display the Results from Gets"); DEFINE_int32(threads, 6, "How many cpu threads"); @@ -86,41 +92,72 @@ int main(int argc, char *argv[]) { auto start_ns = TimeUtil::GetNowNanos(); // Do the Put requests - for (uint64_t i = 0; i < num_puts; i++) { - table->Put(*MakePut(Row(FLAGS_row, i))); - } + if (FLAGS_puts) { + LOG(INFO) << "Sending put requests"; + for (uint64_t i = 0; i < num_puts; i++) { + table->Put(*MakePut(Row(FLAGS_row, i))); + } - LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } // Do the Get requests - start_ns = TimeUtil::GetNowNanos(); - for (uint64_t i = 0; i < num_puts; i++) { - auto result = table->Get(Get{Row(FLAGS_row, i)}); - if (FLAGS_display_results) { - LOG(INFO) << result->DebugString(); + if (FLAGS_gets) { + LOG(INFO) << "Sending get requests"; + start_ns = TimeUtil::GetNowNanos(); + for (uint64_t i = 0; i < num_puts; i++) { + auto result = table->Get(Get{Row(FLAGS_row, i)}); + if (FLAGS_display_results) { + LOG(INFO) << result->DebugString(); + } } - } - LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } // Do the Multi-Gets - std::vector gets; - for (uint64_t i = 0; i < num_puts; ++i) { - hbase::Get get(Row(FLAGS_row, i)); - gets.push_back(get); - } + if (FLAGS_multigets) { + std::vector gets; + for (uint64_t i = 0; i < num_puts; ++i) { + hbase::Get get(Row(FLAGS_row, i)); + gets.push_back(get); + } + + LOG(INFO) << "Sending multi-get requests"; + start_ns = TimeUtil::GetNowNanos(); + auto results = table->Get(gets); - start_ns = TimeUtil::GetNowNanos(); - auto results = table->Get(gets); + if (FLAGS_display_results) { + for (const auto &result : results) LOG(INFO) << result->DebugString(); + } - if (FLAGS_display_results) { - for (const auto &result : results) LOG(INFO) << result->DebugString(); + LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; } - LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + // Do the Scan + if (FLAGS_scans) { + LOG(INFO) << "Starting scanner"; + start_ns = TimeUtil::GetNowNanos(); + Scan scan{}; + auto scanner = table->Scan(scan); + + uint64_t i = 0; + auto r = scanner->Next(); + while (r != nullptr) { + if (FLAGS_display_results) { + LOG(INFO) << r->DebugString(); + } + r = scanner->Next(); + i++; + } + + LOG(INFO) << "Successfully iterated over " << i << " Scan results in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + scanner->Close(); + } table->Close(); client->Close(); diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index a2f31d9..3c71fed 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -25,6 +25,7 @@ #include #include "core/async-connection.h" +#include "core/async-table-result-scanner.h" #include "core/request-converter.h" #include "core/response-converter.h" #include "if/Client.pb.h" @@ -57,6 +58,21 @@ void Table::Put(const hbase::Put &put) { future.get(operation_timeout()); } +std::shared_ptr Table::Scan(const hbase::Scan &scan) { + auto max_cache_size = ResultSize2CacheSize( + scan.MaxResultSize() > 0 ? scan.MaxResultSize() + : async_connection_->connection_conf()->scanner_max_result_size()); + auto scanner = std::make_shared(max_cache_size); + async_table_->Scan(scan, scanner); + return scanner; +} + +int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const { + // * 2 if possible + return max_results_size > (std::numeric_limits::max() / 2) ? max_results_size + : max_results_size * 2; +} + milliseconds Table::operation_timeout() const { return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } diff --git hbase-native-client/core/table.h hbase-native-client/core/table.h index 142baae..9eea5ea 100644 --- hbase-native-client/core/table.h +++ hbase-native-client/core/table.h @@ -32,6 +32,7 @@ #include "core/location-cache.h" #include "core/put.h" #include "core/raw-async-table.h" +#include "core/result-scanner.h" #include "core/result.h" #include "serde/table-name.h" @@ -64,6 +65,8 @@ class Table { // TODO: Batch Puts + std::shared_ptr Scan(const hbase::Scan &scan); + /** * @brief - Close the client connection. */ @@ -82,5 +85,6 @@ class Table { private: milliseconds operation_timeout() const; + int64_t ResultSize2CacheSize(int64_t max_results_size) const; }; } /* namespace hbase */ diff --git hbase-native-client/utils/BUCK hbase-native-client/utils/BUCK index 04e2b67..c9a66d8 100644 --- hbase-native-client/utils/BUCK +++ hbase-native-client/utils/BUCK @@ -20,14 +20,18 @@ cxx_library( exported_headers=[ "bytes-util.h", "connection-util.h", + "optional.h", "sys-util.h", "time-util.h", "user-util.h", "version.h", ], - srcs=["bytes-util.cc", "connection-util.cc", "user-util.cc"], + srcs=[ + "bytes-util.cc", + "connection-util.cc", + "user-util.cc", + ], deps=['//third-party:folly',], - tests=[":user-util-test"], visibility=['PUBLIC',], compiler_flags=['-Weffc++'],) cxx_test( diff --git hbase-native-client/utils/bytes-util-test.cc hbase-native-client/utils/bytes-util-test.cc index d997ee2..b312099 100644 --- hbase-native-client/utils/bytes-util-test.cc +++ hbase-native-client/utils/bytes-util-test.cc @@ -47,3 +47,13 @@ TEST(TestBytesUtil, TestToStringBinary) { EXPECT_EQ("foo_\\x00\\xFF_bar", BytesUtil::ToStringBinary("foo_" + std::string{zero} + std::string{max} + "_bar")); } + +TEST(TestBytesUtil, TestCreateClosestRowAfter) { + std::string empty{""}; + EXPECT_EQ(BytesUtil::CreateClosestRowAfter(empty), std::string{'\0'}); + + std::string foo{"foo"}; + EXPECT_EQ(BytesUtil::CreateClosestRowAfter(foo), std::string{"foo"} + '\0'); + + EXPECT_EQ("f\\x00", BytesUtil::ToStringBinary(BytesUtil::CreateClosestRowAfter("f"))); +} diff --git hbase-native-client/utils/bytes-util.h hbase-native-client/utils/bytes-util.h index 541b2d7..461e984 100644 --- hbase-native-client/utils/bytes-util.h +++ hbase-native-client/utils/bytes-util.h @@ -41,5 +41,24 @@ class BytesUtil { * @return string output */ static std::string ToStringBinary(const std::string& b, size_t off, size_t len); + + static bool IsEmptyStartRow(const std::string& row) { return row == ""; } + + static bool IsEmptyStopRow(const std::string& row) { return row == ""; } + + static int32_t CompareTo(const std::string& a, const std::string& b) { + if (a < b) { + return -1; + } + if (a == b) { + return 0; + } + return 1; + } + + /** + * Create the closest row after the specified row + */ + static std::string CreateClosestRowAfter(std::string row) { return row.append(1, '\0'); } }; } /* namespace hbase */ diff --git hbase-native-client/utils/optional.h hbase-native-client/utils/optional.h new file mode 100644 index 0000000..a05eab5 --- /dev/null +++ hbase-native-client/utils/optional.h @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include + +namespace hbase { + +/** + * An optional value that may or may not be present. + */ +template +using optional = std::experimental::optional; + +} /* namespace hbase */