commit 7d2fcb0ed479aecd020e05bb60f9fd6a7a09a9cf Author: Enis Soztutar Date: Tue Apr 11 16:02:30 2017 +0300 Scan caller builder diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index 412ee3b..0466cbe 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -43,7 +43,10 @@ cxx_library( "request-converter.h", "response-converter.h", "table.h", + "async-scan-rpc-retrying-caller.h", "raw-async-table.h", + "raw-scan-result-consumer.h", + "scan-results-cache.h", "hbase-rpc-controller.h", "time-range.h", "zk-util.h", @@ -67,7 +70,9 @@ cxx_library( "get.cc", "mutation.cc", "put.cc", + "raw-scan-result-consumer.cc", "scan.cc", + "scan-results-cache.cc", "raw-async-table.cc", "result.cc", "request-converter.cc", diff --git hbase-native-client/core/async-connection.cc hbase-native-client/core/async-connection.cc index b945e38..0e50e9d 100644 --- hbase-native-client/core/async-connection.cc +++ hbase-native-client/core/async-connection.cc @@ -41,7 +41,9 @@ void AsyncConnectionImpl::Init() { std::make_shared(io_executor_, codec, connection_conf_->connect_timeout()); location_cache_ = std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); - caller_factory_ = std::make_shared(shared_from_this()); + retry_timer_ = folly::HHWheelTimer::newTimer(io_executor_->getEventBase()); + caller_factory_ = + std::make_shared(shared_from_this(), retry_timer_); } // We can't have the threads continue running after everything is done diff --git hbase-native-client/core/async-connection.h hbase-native-client/core/async-connection.h index ff11577..5f031f9 100644 --- hbase-native-client/core/async-connection.h +++ hbase-native-client/core/async-connection.h @@ -98,6 +98,7 @@ class AsyncConnectionImpl : public AsyncConnection, std::shared_ptr conf_; std::shared_ptr connection_conf_; std::shared_ptr caller_factory_; + std::shared_ptr retry_timer_; std::shared_ptr cpu_executor_; std::shared_ptr io_executor_; std::shared_ptr location_cache_; diff --git hbase-native-client/core/async-rpc-retrying-caller-factory.h hbase-native-client/core/async-rpc-retrying-caller-factory.h index 5bcad6c..844a93c 100644 --- hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -25,8 +25,13 @@ #include #include +#include "async-scan-rpc-retrying-caller.h" #include "connection/rpc-client.h" #include "core/async-rpc-retrying-caller.h" +#include "core/raw-scan-result-consumer.h" +#include "core/region-location.h" +#include "core/scan-results-cache.h" +#include "core/scan.h" #include "if/Client.pb.h" #include "if/HBase.pb.h" @@ -41,8 +46,10 @@ template class SingleRequestCallerBuilder : public std::enable_shared_from_this> { public: - explicit SingleRequestCallerBuilder(std::shared_ptr conn) + explicit SingleRequestCallerBuilder(std::shared_ptr conn, + std::shared_ptr retry_timer) : conn_(conn), + retry_timer_(retry_timer), table_name_(nullptr), rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), pause_(conn->connection_conf()->pause()), @@ -105,7 +112,7 @@ class SingleRequestCallerBuilder std::shared_ptr> Build() { return std::make_shared>( - conn_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, + conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); } @@ -116,6 +123,7 @@ class SingleRequestCallerBuilder private: std::shared_ptr conn_; + std::shared_ptr retry_timer_; std::shared_ptr table_name_; nanoseconds rpc_timeout_nanos_; nanoseconds operation_timeout_nanos_; @@ -127,18 +135,126 @@ class SingleRequestCallerBuilder Callable callable_; }; // end of SingleRequestCallerBuilder +class ScanCallerBuilder : public std::enable_shared_from_this { + public: + explicit ScanCallerBuilder(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), + retry_timer_(retry_timer), + rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), + pause_(conn->connection_conf()->pause()), + scan_timeout_nanos_(conn->connection_conf()->scan_timeout()), + max_retries_(conn->connection_conf()->max_retries()), + start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), + scanner_id_(-1) {} + + virtual ~ScanCallerBuilder() = default; + + typedef ScanCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) { + scan_timeout_nanos_ = scan_timeout_nanos; + return shared_this(); + } + + SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) { + scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos; + return shared_this(); + } + + SharedThisPtr pause(nanoseconds pause) { + pause_ = pause; + return shared_this(); + } + + SharedThisPtr max_retries(uint32_t max_retries) { + max_retries_ = max_retries; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + + SharedThisPtr region_location(std::shared_ptr region_location) { + region_location_ = region_location; + return shared_this(); + } + + SharedThisPtr scanner_id(int64_t scanner_id) { + scanner_id_ = scanner_id; + return shared_this(); + } + + SharedThisPtr scan(std::shared_ptr scan) { + scan_ = scan; + return shared_this(); + } + + SharedThisPtr results_cache(std::shared_ptr results_cache) { + results_cache_ = results_cache; + return shared_this(); + } + + SharedThisPtr consumer(std::shared_ptr consumer) { + consumer_ = consumer; + return shared_this(); + } + + std::shared_ptr Build() { + return std::make_shared( + conn_, retry_timer_, scan_, scanner_id_, results_cache_, consumer_, region_location_, + scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_, rpc_timeout_nanos_, + start_log_errors_count_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + std::shared_ptr scan_; + nanoseconds rpc_timeout_nanos_; + nanoseconds scan_timeout_nanos_; + nanoseconds scanner_lease_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; + std::shared_ptr region_location_; + int64_t scanner_id_; + std::shared_ptr consumer_; + std::shared_ptr results_cache_; +}; // end of ScanCallerBuilder + class AsyncRpcRetryingCallerFactory { private: std::shared_ptr conn_; + std::shared_ptr retry_timer_; public: - explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : conn_(conn) {} + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr conn, + std::shared_ptr retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} virtual ~AsyncRpcRetryingCallerFactory() = default; template std::shared_ptr> Single() { - return std::make_shared>(conn_); + return std::make_shared>(conn_, retry_timer_); + } + + std::shared_ptr Scan() { + return std::make_shared(conn_, retry_timer_); } }; diff --git hbase-native-client/core/async-rpc-retrying-caller.cc hbase-native-client/core/async-rpc-retrying-caller.cc index 965a44b..02c67c7 100644 --- hbase-native-client/core/async-rpc-retrying-caller.cc +++ hbase-native-client/core/async-rpc-retrying-caller.cc @@ -38,11 +38,12 @@ namespace hbase { template AsyncSingleRequestRpcRetryingCaller::AsyncSingleRequestRpcRetryingCaller( - std::shared_ptr conn, std::shared_ptr table_name, + std::shared_ptr conn, std::shared_ptr retry_timer, std::shared_ptr table_name, const std::string& row, RegionLocateType locate_type, Callable callable, nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) : conn_(conn), + retry_timer_(retry_timer), table_name_(table_name), row_(row), locate_type_(locate_type), @@ -58,7 +59,6 @@ AsyncSingleRequestRpcRetryingCaller::AsyncSingleRequestRpcRetryingCaller( start_ns_ = TimeUtil::GetNowNanos(); max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); exceptions_ = std::make_shared>(); - retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); } template diff --git hbase-native-client/core/async-rpc-retrying-caller.h hbase-native-client/core/async-rpc-retrying-caller.h index 6006388..64ac837 100644 --- hbase-native-client/core/async-rpc-retrying-caller.h +++ hbase-native-client/core/async-rpc-retrying-caller.h @@ -70,6 +70,7 @@ template class AsyncSingleRequestRpcRetryingCaller { public: AsyncSingleRequestRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr retry_timer, std::shared_ptr table_name, const std::string& row, RegionLocateType locate_type, Callable callable, nanoseconds pause, @@ -97,8 +98,8 @@ class AsyncSingleRequestRpcRetryingCaller { const int64_t& timeout_ns); private: - folly::HHWheelTimer::UniquePtr retry_timer_; std::shared_ptr conn_; + std::shared_ptr retry_timer_; std::shared_ptr table_name_; std::string row_; RegionLocateType locate_type_; @@ -114,6 +115,5 @@ class AsyncSingleRequestRpcRetryingCaller { uint32_t tries_; std::shared_ptr> exceptions_; uint32_t max_attempts_; - folly::EventBase event_base_; }; } /* namespace hbase */ diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index 4956972..d89e390 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -98,7 +98,7 @@ class MockAsyncConnection : public AsyncConnection, : conn_conf_(conn_conf), rpc_client_(rpc_client), region_locator_(region_locator) {} ~MockAsyncConnection() {} void Init() { - caller_factory_ = std::make_shared(shared_from_this()); + caller_factory_ = std::make_shared(shared_from_this(), nullptr); } std::shared_ptr conf() override { return nullptr; } diff --git hbase-native-client/core/async-scan-rpc-retrying-caller.h hbase-native-client/core/async-scan-rpc-retrying-caller.h new file mode 100644 index 0000000..f715984 --- /dev/null +++ hbase-native-client/core/async-scan-rpc-retrying-caller.h @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/async-connection.h" +#include "core/hbase-rpc-controller.h" +#include "core/raw-scan-result-consumer.h" +#include "core/region-location.h" +#include "core/result.h" +#include "core/scan-results-cache.h" +#include "core/scan.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +class AsyncScanRpcRetryingCaller { + public: + AsyncScanRpcRetryingCaller(std::shared_ptr conn, + std::shared_ptr retry_timer, + std::shared_ptr scan, int64_t scanner_id, + std::shared_ptr results_cache, + std::shared_ptr consumer, + std::shared_ptr region_location, + nanoseconds scanner_lease_timeout_nanos, nanoseconds pause, + uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) + : conn_(conn), + retry_timer_(retry_timer), + scan_(scan), + scanner_id_(scanner_id), + results_cache_(results_cache), + consumer_(consumer), + region_location_(region_location), + scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos), + pause_(pause), + max_retries_(max_retries), + scan_timeout_nanos_(scan_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + controller_ = conn_->CreateRpcController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared>(); + } + + private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + std::shared_ptr scan_; + int64_t scanner_id_; + std::shared_ptr results_cache_; + std::shared_ptr consumer_; + std::shared_ptr region_location_; + nanoseconds scanner_lease_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + nanoseconds scan_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + uint64_t start_ns_; + uint32_t tries_; + std::shared_ptr> exceptions_; + uint32_t max_attempts_; +}; +} // namespace hbase diff --git hbase-native-client/core/raw-scan-result-consumer.cc hbase-native-client/core/raw-scan-result-consumer.cc new file mode 100644 index 0000000..530c946 --- /dev/null +++ hbase-native-client/core/raw-scan-result-consumer.cc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/raw-scan-result-consumer.h" + +#include +#include +#include +#include + +namespace hbase { + +} // namespace hbase diff --git hbase-native-client/core/raw-scan-result-consumer.h hbase-native-client/core/raw-scan-result-consumer.h new file mode 100644 index 0000000..5d2d80e --- /dev/null +++ hbase-native-client/core/raw-scan-result-consumer.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +class RawScanResultConsumer {}; +} // namespace hbase diff --git hbase-native-client/core/scan-results-cache.cc hbase-native-client/core/scan-results-cache.cc new file mode 100644 index 0000000..5e1210c --- /dev/null +++ hbase-native-client/core/scan-results-cache.cc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/scan-results-cache.h" + +#include +#include +#include +#include + +namespace hbase { + +} // namespace hbase diff --git hbase-native-client/core/scan-results-cache.h hbase-native-client/core/scan-results-cache.h new file mode 100644 index 0000000..bc8346b --- /dev/null +++ hbase-native-client/core/scan-results-cache.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +class ScanResultsCache {}; +} // namespace hbase