From 8dde5ed5121800a4006a4a7373c2ea3397b87927 Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Wed, 18 Jan 2017 13:43:38 -0800 Subject: [PATCH] HBASE-17465. [C++] implement request retry mechanism over RPC --- hbase-native-client/core/async-connection.cc | 25 ++ hbase-native-client/core/async-connection.h | 35 +++ .../core/async-rpc-retrying-caller-factory.cc | 24 ++ .../core/async-rpc-retrying-caller-factory.h | 153 +++++++++++++ .../core/async-rpc-retrying-caller.cc | 24 ++ .../core/async-rpc-retrying-caller.h | 254 +++++++++++++++++++++ .../core/async-rpc-retrying-test.cc | 221 ++++++++++++++++++ hbase-native-client/core/client-test.cc | 67 +----- hbase-native-client/core/client-test.h | 88 +++++++ hbase-native-client/core/hbase-rpc-controller.cc | 24 ++ hbase-native-client/core/hbase-rpc-controller.h | 39 ++++ hbase-native-client/core/hconstants.h | 41 ++++ hbase-native-client/core/region-location.h | 6 + hbase-native-client/exceptions/exception.h | 96 ++++++++ hbase-native-client/if/Client.proto | 1 + hbase-native-client/utils/connection-util.h | 71 ++++++ hbase-native-client/utils/sys-util.h | 39 ++++ hbase-native-client/utils/time-util.h | 45 ++++ 18 files changed, 1187 insertions(+), 66 deletions(-) create mode 100644 hbase-native-client/core/async-connection.cc create mode 100644 hbase-native-client/core/async-connection.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.h create mode 100644 hbase-native-client/core/async-rpc-retrying-test.cc create mode 100644 hbase-native-client/core/client-test.h create mode 100644 hbase-native-client/core/hbase-rpc-controller.cc create mode 100644 hbase-native-client/core/hbase-rpc-controller.h create mode 100644 hbase-native-client/core/hconstants.h create mode 100644 hbase-native-client/exceptions/exception.h create mode 100644 hbase-native-client/utils/connection-util.h create mode 100644 hbase-native-client/utils/sys-util.h create mode 100644 hbase-native-client/utils/time-util.h diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc new file mode 100644 index 0000000..085e81f --- /dev/null +++ b/hbase-native-client/core/async-connection.cc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-connection.h" + +namespace hbase { + +} // namespace hbase + diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h new file mode 100644 index 0000000..1f7d3c6 --- /dev/null +++ b/hbase-native-client/core/async-connection.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +namespace hbase { + +class AsyncConnection { +public: + AsyncConnection() {} + virtual ~AsyncConnection() = default; +}; + +class AsyncConnectionImpl { +public: + AsyncConnectionImpl() {} + virtual ~AsyncConnectionImpl() = default; +}; +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..248f26b --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller-factory.h" + +namespace hbase { + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..0c8d25c --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include "if/HBase.pb.h" +#include "if/Client.pb.h" +#include "async-connection.h" +#include "async-rpc-retrying-caller.h" +#include + + +using namespace folly; +using hbase::pb::TableName; + +namespace hbase { + +template +class SingleRequestCallerBuilder; + + +template +class AsyncRpcRetryingCallerFactory { +public: + AsyncRpcRetryingCallerFactory(std::shared_ptr conn) : + conn_(conn) { + retry_timer = std::make_shared(event_base.timer()); + } + + /* used for test */ + AsyncRpcRetryingCallerFactory( + std::shared_ptr conn, + std::shared_ptr retry_timer) : + conn_(conn), retry_timer_(retry_timer) { + } + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template + std::shared_ptr> Single() { + return std::make_shared>(); + } + +private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + folly::EventBase event_base; + +public: // SingleRequestCallerBuilder + template + class SingleRequestCallerBuilder : public std::enable_shared_from_this< + SingleRequestCallerBuilder> { + public: + SingleRequestCallerBuilder() : + table_name_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr table( + std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr rpc_timeout( + long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr operation_timeout( + long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr row(const std::string& row) { + row_ = row; + return shared_this(); + } + + SharedThisPtr locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable callable) { + callable_ = callable; + return shared_this(); + } + + folly::Future Call() + { + return Build()->Call(); + } + + std::shared_ptr> Build() { + return std::make_shared>( + retry_timer_, + conn_, + table_name_, + row_, + locate_type_, + callable_, + conn_->get_conn_conf()->GetPauseNs(), + conn_->get_conn_conf()->GetMaxRetries(), + operation_timeout_nanos_, + rpc_timeout_nanos_, + conn_->get_conn_conf()->GetStartLogErrorsCount()); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr table_name_; + long rpc_timeout_nanos_; + long operation_timeout_nanos_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; + }; +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc new file mode 100644 index 0000000..80a8f34 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h new file mode 100644 index 0000000..d2b9d8c --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "hbase-rpc-controller.h" +#include "region-location.h" +#include +#include +#include "if/HBase.pb.h" +#include +#include +#include "exceptions/exception.h" +#include +#include +#include "utils/sys-util.h" +#include "utils/connection-util.h" +#include "utils/time-util.h" +#include + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +template +using Supplier = std::function; + +template +using Consumer = std::function; + +template +using Converter = std::function; + +template +using RpcCallback = std::function; + +template +using RpcCall = std::function, + std::shared_ptr, + REQ, + RpcCallback)>; + +template +using Callable = std::function(std::shared_ptr, + std::shared_ptr, + std::shared_ptr)>; + +template +class AsyncSingleRequestRpcRetryingCaller { +public: + AsyncSingleRequestRpcRetryingCaller( + std::shared_ptr retry_timer, + std::shared_ptr conn, + std::shared_ptr table_name, + const std::string& row, + RegionLocateType locate_type, + Callable callable, + long pause_ns, + int max_retries, + long operation_timeout_nanos, + long rpc_timeout_nanos, + int start_log_errors_count) : + retry_timer_(retry_timer), + conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + + controller_ = conn_->get_rpc_controller_factory()->NewController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared< + std::vector>(); + } + + virtual ~AsyncSingleRequestRpcRetryingCaller() = default; + + folly::Future Call() { + LocateThenCall(); + return promise_->getFuture(); + } + +private: + void LocateThenCall() { + long locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->get_locator()->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](RegionLocation &loc) { + Call(loc);}) + .onError( + [this] (const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + + row_ + "' in " + table_name_ + + " failed, tries = " + tries_ + ", maxAttempts = " + max_attempts_ + ", timeout = " + + TimeUtil::ToMillis(operation_timeout_nanos_) + + " ms, time elapsed = " + + TimeUtil::ElapsedMs(this->start_ns_) + " ms";}, + [](const std::exception& error) {}); + }); + } + + void OnError(const std::exception& error, Supplier err_msg, + Consumer update_cached_location) { + ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos(), ""); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf(error) + || tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + long delay_ns; + if (operation_timeout_nanos_ > 0) { + long max_delay_ns = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = std::min(max_delay_ns, + ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() {LocateThenCall();}, + nanoseconds(delay_ns)); + } + + void Call(RegionLocation& loc) { + long call_timeout_ns; + if (operation_timeout_nanos_ > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_); + } else { + call_timeout_ns = rpc_timeout_nanos_; + } + + std::shared_ptr stub; + try { + stub = conn_->GetRegionServerStub(loc.server_name()); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async stub to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), loc.server_name().port()) + + " for '" + row_ + + "' in " + loc.DebugString() + " of " + table_name_ + + " failed, tries = " + tries_ + ", maxAttempts = " + max_attempts_ + ", timeout = " + + TimeUtil::ToMillis(this->operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMs(this->start_ns_) + " ms";}, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error);}); + return; + } + + ConnectionUtils::ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared(loc), stub).then( + [this](const RESP& resp) { + this->promise_->setValue(resp); + }).onError( + [&, this] (const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), loc.server_name().port()) + + " for '" + row_ + "' in " + + loc.DebugString() + " of " + table_name_ + " failed, tries = " + + tries_ + ", maxAttempts = " + max_attempts_ + ", timeout = " + + TimeUtil::ToMillis(this->operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMs(this->start_ns_) + " ms";}, + [&, this](const std::exception& error) { + conn_->get_locator().UpdateCachedLocation(loc, error);}); + return; + }); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + long RemainingTimeNs() { + return operation_timeout_nanos_ - (TimeUtil::GetNowNanos() - start_ns_); + } + +private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::string row_; + RegionLocateType locate_type_; + Callable callable_; + long pause_ns_; + int max_retries_; + long operation_timeout_nanos_; + long rpc_timeout_nanos_; + int start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + long start_ns_; + int tries_; + std::shared_ptr> exceptions_; + int max_attempts_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc new file mode 100644 index 0000000..228a1ba --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include "hbase-rpc-controller.h" +#include "if/HBase.pb.h" + +#include "region-location.h" +#include "if/Client.pb.h" +#include "connection/request.h" +#include "test-util/test-util.h" +#include "client-test.h" +#include "core/result.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/response_converter.h" +#include "connection/request.h" +#include "connection/response.h" +#include "async-rpc-retrying-caller.h" +#include "exceptions/exception.h" + +using namespace hbase; +using namespace hbase::pb; + + +class MockHBaseRpcController : public HBaseRpcController { +public: + MOCK_METHOD1(set_call_timeout, void(const long&)); +}; + + +class MockRpcControllerFactory { +public: + MOCK_METHOD0(NewController, std::shared_ptr()); +}; + +class MockAsyncConnectionConfiguration { +public: + MOCK_METHOD0(GetPauseNs, long()); + MOCK_METHOD0(GetMaxRetries, int()); + MOCK_METHOD0(GetStartLogErrorsCount, int()); + MOCK_METHOD0(GetReadRpcTimeoutNs, long()); + MOCK_METHOD0(GetOperationTimeoutNs, long()); +}; + + +class MockAsyncRegionLocator { +public: + MOCK_METHOD4(GetRegionLocation, + folly::Future( + std::shared_ptr, const std::string&, + RegionLocateType,long)); + + MOCK_METHOD2(UpdateCachedLocation, void(const RegionLocation&, const std::exception&)); + +}; + +class MockAsyncConnection { +public: + MOCK_METHOD0(get_conn_conf, std::shared_ptr()); + MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr()); + MOCK_METHOD0(get_locator, std::shared_ptr()); + /* implement mock through ClientService_Stub::ClientService_Stub(RpcClient::CreateRpcChannel), + * where ClientService_Stub : ClientService */ + MOCK_METHOD1(GetRegionServerStub, std::shared_ptr(const ServerName&)); +}; + + +template +class MockRawAsyncTableImpl { +public: + MockRawAsyncTableImpl(std::shared_ptr conn) : conn_(conn) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + // folly::Future Get(const hbase::Get&); + + /* in real RawAsyncTableImpl, this should be private. */ + folly::Future GetCall( + std::shared_ptr stub, + std::shared_ptr controller, + std::shared_ptr loc, + const hbase::Get& get) { + return Call( + stub, + controller, + loc, + get, + &hbase::RequestConverter::ToGetRequest, + [] (auto& stub, auto& controller, auto& preq, auto& done) { + stub->Get(controller.get(), &preq, &done);}, + &hbase::ResponseConverter::FromGetResponse); + } + + /* in real RawAsyncTableImpl, this should be private. */ + template + folly::Future Call( + std::shared_ptr stub, + std::shared_ptr controller, + std::shared_ptr loc, + const REQ& req, + const Converter& req_converter, + const hbase::RpcCall& rpc_call, + const Converter>& resp_converter) { + + std::shared_ptr> promise; + + rpc_call(stub, controller, req_converter(req, loc->region_name()), [&](auto& presp) { + if (controller->failed()) { + promise->setException(hbase::IOException(controller->errorText())); + } else { + try { + hbase::Response resp; + resp.set_resp_msg(std::make_shared(presp)); + promise->setValue(resp_converter(resp)); + } catch (const std::exception& e) { + promise->setException(e); + } + } + }); + + return promise->getFuture(); + } +private: + std::shared_ptr conn_; +}; + + +TEST(AsyncRpcRetryTest, TestGetBasic) { + + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to < hbase::pb::TableName > ("t"); + auto row = "test2"; + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf.value()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + hbase::Get get(row); + + /* create hbase client connection */ + auto conn = std::make_shared(); + + /* create retry caller factory */ + auto tableImpl = std::make_shared>(conn); + AsyncRpcRetryingCallerFactory caller_factory(conn); + + /* create request caller builder */ + auto p = caller_factory.Single(); + + /* call with retry to get result */ + hbase::Result result = + p->table(std::make_shared(tn)) + ->row(row) + ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) + ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) + ->action( + [=, &get](auto& controller, auto& loc, auto& stub) -> folly::Future { + return tableImpl->GetCall(stub, controller, loc, get);}) + ->Build()->Call().get(); + + /* verify result */ + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + + /* clean */ + delete test_util; + table->Close(); + client.Close(); +} + + +TEST(AsyncRpcRetryTest, TestGetWithRetry) { + // TODO: +} + + + + + + diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index 28eec6f..ae73c58 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -17,72 +17,7 @@ * */ -#include "core/client.h" -#include -#include "core/configuration.h" -#include "core/get.h" -#include "core/hbase_configuration_loader.h" -#include "core/result.h" -#include "core/table.h" -#include "serde/table-name.h" -#include "test-util/test-util.h" - -class ClientTest { - public: - const static std::string kDefHBaseConfPath; - - const static std::string kHBaseDefaultXml; - const static std::string kHBaseSiteXml; - - const static std::string kHBaseXmlData; - - static void WriteDataToFile(const std::string &file, const std::string &xml_data) { - std::ofstream hbase_conf; - hbase_conf.open(file.c_str()); - hbase_conf << xml_data; - hbase_conf.close(); - } - - static void CreateHBaseConf(const std::string &dir, const std::string &file, - const std::string xml_data) { - // Directory will be created if not present - if (!boost::filesystem::exists(dir)) { - boost::filesystem::create_directories(dir); - } - // Remove temp file always - boost::filesystem::remove((dir + file).c_str()); - WriteDataToFile((dir + file), xml_data); - } - - static void CreateHBaseConfWithEnv() { - // Creating Empty Config Files so that we dont get a Configuration exception @Client - CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData); - CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData); - setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); - } -}; - -const std::string ClientTest::kDefHBaseConfPath("./build/test-data/client-test/conf/"); - -const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml"); -const std::string ClientTest::kHBaseSiteXml("hbase-site.xml"); - -const std::string ClientTest::kHBaseXmlData( - "\n\n\n\n\n"); +#include "client-test.h" TEST(Client, EmptyConfigurationPassedToClient) { ASSERT_ANY_THROW(hbase::Client client); } diff --git a/hbase-native-client/core/client-test.h b/hbase-native-client/core/client-test.h new file mode 100644 index 0000000..b8ec691 --- /dev/null +++ b/hbase-native-client/core/client-test.h @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/client.h" +#include +#include "core/configuration.h" +#include "core/get.h" +#include "core/hbase_configuration_loader.h" +#include "core/result.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" + +class ClientTest { + public: + const static std::string kDefHBaseConfPath; + + const static std::string kHBaseDefaultXml; + const static std::string kHBaseSiteXml; + + const static std::string kHBaseXmlData; + + static void WriteDataToFile(const std::string &file, const std::string &xml_data) { + std::ofstream hbase_conf; + hbase_conf.open(file.c_str()); + hbase_conf << xml_data; + hbase_conf.close(); + } + + static void CreateHBaseConf(const std::string &dir, const std::string &file, + const std::string xml_data) { + // Directory will be created if not present + if (!boost::filesystem::exists(dir)) { + boost::filesystem::create_directories(dir); + } + // Remove temp file always + boost::filesystem::remove((dir + file).c_str()); + WriteDataToFile((dir + file), xml_data); + } + + static void CreateHBaseConfWithEnv() { + // Creating Empty Config Files so that we dont get a Configuration exception @Client + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData); + setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); + } +}; + +const std::string ClientTest::kDefHBaseConfPath("./build/test-data/client-test/conf/"); + +const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml"); +const std::string ClientTest::kHBaseSiteXml("hbase-site.xml"); + +const std::string ClientTest::kHBaseXmlData( + "\n\n\n\n\n"); + diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc new file mode 100644 index 0000000..f5b5963 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase-rpc-controller.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h new file mode 100644 index 0000000..65fb2c9 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +using google::protobuf::RpcController; + +namespace hbase { + +class HBaseRpcController: public RpcController { +public: + HBaseRpcController() {} + virtual ~HBaseRpcController() = default; + + virtual void set_call_timeout(const long& call_timeout) { + // TODO: + } +}; + +} /* namespace hbase */ + + diff --git a/hbase-native-client/core/hconstants.h b/hbase-native-client/core/hconstants.h new file mode 100644 index 0000000..ce41e74 --- /dev/null +++ b/hbase-native-client/core/hconstants.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +namespace hbase { + +class HConstants { +public: + /** + * Retrying we multiply hbase.client.pause setting by what we have in this array until we + * run out of array items. Retries beyond this use the last number in the array. So, for + * example, if hbase.client.pause is 1 second, and maximum retries count + * hbase.client.retries.number is 10, we will retry at the following intervals: + * 1, 2, 3, 5, 10, 20, 40, 100, 100, 100. + * With 100ms, a back-off of 200 means 20s + */ + static const std::vector RETRY_BACKOFF; +}; + +const std::vector HConstants::RETRY_BACKOFF = { 1, 2, 3, 5, 10, 20, 40, + 100, 100, 100, 100, 200, 200 }; +} /* namespace hbase */ + diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index b0411cb..5840e1e 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -26,6 +26,12 @@ namespace hbase { +enum RegionLocateType { + kBefore, + kCurrent, + kAfter +}; + /** * @brief class to hold where a region is located. * diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h new file mode 100644 index 0000000..a95ba68 --- /dev/null +++ b/hbase-native-client/exceptions/exception.h @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +namespace hbase { + +class ThrowableWithExtraContext { +public: + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when, const std::string& extras) : + cause_(cause), when_(when), extras_(extras) { + } + + std::string ToString() { + // TODO: + // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + return extras_ + ", " + cause_->what(); + } + + std::shared_ptr cause() { + return cause_; + } +private: + std::shared_ptr cause_; + long when_; + std::string extras_; +}; + +class IOException: public std::logic_error { +public: + IOException( + const std::string& what, + std::shared_ptr cause) : + logic_error(what), cause_(cause) {} + virtual ~IOException() = default; + + std::shared_ptr cause() { + return cause_; + } +private: + const std::shared_ptr cause_; +}; + +class RetriesExhaustedException: public IOException { +public: + RetriesExhaustedException( + const int& num_retries, + std::shared_ptr> exceptions) : + IOException( + GetMessage(num_retries, exceptions), + exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + } + virtual ~RetriesExhaustedException() = default; + +private: + std::string GetMessage( + const int& num_retries, + std::shared_ptr> exceptions) { + std::string buffer("Failed after attempts="); + buffer.append(std::to_string(num_retries + 1)); + buffer.append(", exceptions:\n"); + for (auto it = exceptions->begin(); it != exceptions->end(); it++) { + buffer.append(it->ToString()); + buffer.append("\n"); + } + return buffer; + } +}; + +class HBaseIOException : public IOException { +}; + +class DoNotRetryIOException : public HBaseIOException { +}; +} // namespace hbase diff --git a/hbase-native-client/if/Client.proto b/hbase-native-client/if/Client.proto index 8a4d459..301d52a 100644 --- a/hbase-native-client/if/Client.proto +++ b/hbase-native-client/if/Client.proto @@ -22,6 +22,7 @@ package hbase.pb; option java_package = "org.apache.hadoop.hbase.protobuf.generated"; option java_outer_classname = "ClientProtos"; option java_generic_services = true; +option cc_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h new file mode 100644 index 0000000..4e1cf4e --- /dev/null +++ b/hbase-native-client/utils/connection-util.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include ; +#include "core/hconstants.h" +#include +#include "core/hbase-rpc-controller.h" +#include "utils/time-util.h" + +namespace hbase { +class ConnectionUtils { +public: + static int Retries2Attempts(const int& retries) { + return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1); + } + + /* Add a delta to avoid timeout immediately after a retry sleeping. */ + static const long SLEEP_DELTA_NS = 1000000; + + /** + * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. + * @param pause time to pause + * @param tries amount of tries + * @return How long to wait after tries retries + */ + static long GetPauseTime(const long& pause, const int& tries) { + int ntries = tries; + if (ntries >= HConstants::RETRY_BACKOFF.size()) { + ntries = HConstants::RETRY_BACKOFF.size() - 1; + } + if (ntries < 0) { + ntries = 0; + } + + long normal_pause = pause * HConstants::RETRY_BACKOFF[ntries]; + // 1% possible jitter + float r = static_cast(std::rand()) / static_cast(RAND_MAX); + long jitter = (long) (normal_pause * r * 0.01f); + return normal_pause + jitter; + } + + static void ResetController( + std::shared_ptr controller, + const long& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns))); + } + } +}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h new file mode 100644 index 0000000..759be24 --- /dev/null +++ b/hbase-native-client/utils/sys-util.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include + +namespace hbase { + +class SysUtil { +public: + template + static constexpr bool InstanceOf(const DERIVED& object) { + return !dynamic_cast(object); + } + + template + static constexpr bool InstanceOf() { + return std::is_base_of(); + } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h new file mode 100644 index 0000000..dfd51b4 --- /dev/null +++ b/hbase-native-client/utils/time-util.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class TimeUtil { +public: + static long ToMillis(const long& nanos) { + std::chrono::duration_cast(nanoseconds(nanos)).count(); + } + + static long GetNowNanos() { + auto duration = + std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + static long ElapsedMs(const long& start_ns) { + std::chrono::duration_cast( + nanoseconds(GetNowNanos() - start_ns)).count(); + } +}; +} /* namespace hbase */ + -- 2.10.1 (Apple Git-78)