From 0e5377c34ff3d8d1f342a59e4ca049670fe1a5bc Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Wed, 18 Jan 2017 13:43:38 -0800 Subject: [PATCH] HBASE-17465. [C++] implement request retry mechanism over RPC --- hbase-native-client/core/async-connection.cc | 25 ++ hbase-native-client/core/async-connection.h | 35 +++ .../core/async-rpc-retrying-caller-factory.cc | 24 ++ .../core/async-rpc-retrying-caller-factory.h | 154 ++++++++++++ .../core/async-rpc-retrying-caller.cc | 24 ++ .../core/async-rpc-retrying-caller.h | 272 +++++++++++++++++++++ hbase-native-client/core/hbase-rpc-controller.cc | 24 ++ hbase-native-client/core/hbase-rpc-controller.h | 37 +++ hbase-native-client/core/hconstants.h | 41 ++++ hbase-native-client/core/hregion-info.h | 50 ++++ hbase-native-client/core/hregion-location.cc | 24 ++ hbase-native-client/core/hregion-location.h | 62 +++++ hbase-native-client/exceptions/exception.h | 96 ++++++++ hbase-native-client/if/Client.proto | 1 + hbase-native-client/utils/connection-util.h | 71 ++++++ hbase-native-client/utils/sys-util.h | 39 +++ hbase-native-client/utils/time-util.h | 45 ++++ 17 files changed, 1024 insertions(+) create mode 100644 hbase-native-client/core/async-connection.cc create mode 100644 hbase-native-client/core/async-connection.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller-factory.h create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.cc create mode 100644 hbase-native-client/core/async-rpc-retrying-caller.h create mode 100644 hbase-native-client/core/hbase-rpc-controller.cc create mode 100644 hbase-native-client/core/hbase-rpc-controller.h create mode 100644 hbase-native-client/core/hconstants.h create mode 100644 hbase-native-client/core/hregion-info.h create mode 100644 hbase-native-client/core/hregion-location.cc create mode 100644 hbase-native-client/core/hregion-location.h create mode 100644 hbase-native-client/exceptions/exception.h create mode 100644 hbase-native-client/utils/connection-util.h create mode 100644 hbase-native-client/utils/sys-util.h create mode 100644 hbase-native-client/utils/time-util.h diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc new file mode 100644 index 0000000..085e81f --- /dev/null +++ b/hbase-native-client/core/async-connection.cc @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-connection.h" + +namespace hbase { + +} // namespace hbase + diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h new file mode 100644 index 0000000..1f7d3c6 --- /dev/null +++ b/hbase-native-client/core/async-connection.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +namespace hbase { + +class AsyncConnection { +public: + AsyncConnection() {} + virtual ~AsyncConnection() = default; +}; + +class AsyncConnectionImpl { +public: + AsyncConnectionImpl() {} + virtual ~AsyncConnectionImpl() = default; +}; +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..248f26b --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller-factory.h" + +namespace hbase { + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..5a26042 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include "if/HBase.pb.h" +#include "if/Client.pb.h" +#include "async-connection.h" +#include "async-rpc-retrying-caller.h" +#include "hregion-location.h" + +using hbase::pb::TableName; + +namespace hbase { + +template +class SingleRequestCallerBuilder; + + +template +class AsyncRpcRetryingCallerFactory { +public: + AsyncRpcRetryingCallerFactory( + std::shared_ptr conn, + std::shared_ptr retry_timer) : + conn_(conn), retry_timer_(retry_timer) { + } + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template + std::shared_ptr> Single() { + return std::make_shared>(); + } + +private: + std::shared_ptr conn_; + std::shared_ptr retry_timer_; + +public: // SingleRequestCallerBuilder + template + class SingleRequestCallerBuilder : public std::enable_shared_from_this< + SingleRequestCallerBuilder> { + public: + SingleRequestCallerBuilder() : + table_name_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0), + row_(nullptr), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder GenenericThisType; + typedef std::shared_ptr SharedThisPtr; + + SharedThisPtr table( + std::shared_ptr table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr set_rpc_timeout( + long rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr set_operation_timeout( + long operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr row(std::unique_ptr row) { + row_ = std::move(row); + return shared_this(); + } + + SharedThisPtr set_locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable callable) { + callable_ = callable; + return shared_this(); + } + + folly::Future Call() + { + return Build()->Call(); + } + + std::shared_ptr> Build() { + return std::make_shared>( + retry_timer_, + conn_, + table_name_, + row_, + locate_type_, + callable_, + conn_->get_conn_conf()->get_pause_ns(), + conn_->get_conn_conf()->get_max_retries(), + operation_timeout_nanos_, + rpc_timeout_nanos_, + conn_->get_conn_conf()->get_start_log_errors_count()); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this::shared_from_this(); + } + + private: + std::shared_ptr table_name_; + long rpc_timeout_nanos_; + long operation_timeout_nanos_; + std::unique_ptr row_; + RegionLocateType locate_type_; + Callable callable_; + }; +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc new file mode 100644 index 0000000..80a8f34 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "async-rpc-retrying-caller.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h new file mode 100644 index 0000000..7ed5c1a --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include "hbase-rpc-controller.h" +#include "hregion-location.h" +#include +#include +#include +#include "if/HBase.pb.h" +#include +#include +#include "exceptions/exception.h" +#include +#include +#include "utils/sys-util.h" +#include "utils/connection-util.h" +#include "utils/time-util.h" +#include + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +template +using Supplier = std::function; + +template +using Consumer = std::function; + +template +using Converter = std::function; + +template +using RpcCallback = std::function; + + +template +using RpcCall = std::function, + std::shared_ptr, + REQ, + RpcCallback)>; + +template +using Callable = std::function(std::shared_ptr, + std::shared_ptr, + std::shared_ptr, + REQ, + Converter, REQ>, + RpcCall, + Converter, PRESP>)>; + +template +class AsyncSingleRequestRpcRetryingCaller { +public: + AsyncSingleRequestRpcRetryingCaller( + std::shared_ptr retry_timer, + std::shared_ptr conn, + std::shared_ptr table_name, + std::unique_ptr row, + RegionLocateType locate_type, + Callable callable, + long pause_ns, + int max_retries, + long operation_timeout_nanos, + long rpc_timeout_nanos, + int start_log_errors_count) : + retry_timer_(retry_timer), + conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared>()), + tries_(1) { + + controller_ = conn_->get_rpc_controller_factory()->NewController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared< + std::vector>(); + } + + virtual ~AsyncSingleRequestRpcRetryingCaller() = default; + + folly::Future Call() { + LocateThenCall(); + return promise_->getFuture(); + } + +private: + void LocateThenCall() { + long locate_timeout_ns; + if (operation_timeout_nanos_ > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->get_locator()->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](const HRegionLocation &loc) { + Call(loc);}) + .onError( + [this] (const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + + IOBuf2StringBinary() + "' in " + table_name_ + + " failed, tries = " + tries_ + ", maxAttempts = " + max_attempts_ + ", timeout = " + + TimeUtil::ToMillis(operation_timeout_nanos_) + + " ms, time elapsed = " + + TimeUtil::ElapsedMs(this->start_ns_) + " ms";}, + [](const std::exception& error) {}); + }); + } + + std::string IOBuf2StringBinary() { + return std::string(reinterpret_cast(row_->data()), + row_->length()); + } + + void OnError(const std::exception& error, Supplier err_msg, + Consumer update_cached_location) { + ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos(), ""); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf(error) + || tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + long delay_ns; + if (operation_timeout_nanos_ > 0) { + long max_delay_ns = RemainingTimeNs() - ConnectionUtils::SLEEP_DELTA_NS; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = std::min(max_delay_ns, + ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_, tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() {LocateThenCall();}, + nanoseconds(delay_ns)); + } + + void Call(const HRegionLocation& loc) { + long call_timeout_ns; + if (operation_timeout_nanos_ > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_); + } else { + call_timeout_ns = rpc_timeout_nanos_; + } + + std::shared_ptr stub; + try { + stub = conn_->GetRegionServerStub(loc.server_name()); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async stub to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), loc.server_name().port()) + + " for '" + IOBuf2StringBinary() + + "' in " + loc.region_info()->GetPrintName() + " of " + table_name_ + + " failed, tries = " + tries_ + ", maxAttempts = " + max_attempts_ + ", timeout = " + + TimeUtil::ToMillis(this->operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMs(this->start_ns_) + " ms";}, + [&, this](const std::exception& error) { + conn_->get_locator().UpdateCachedLocation(loc, error);}); + return; + } + + ConnectionUtils::ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared(loc), stub).then( + [this](const RESP& resp) { + this->promise_->setValue(resp); + }).onError( + [&, this] (const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), loc.server_name().port()) + + " for '" + IOBuf2StringBinary() + "' in " + + loc.region_info()->GetPrintName() + " of " + table_name_ + " failed, tries = " + + tries_ + ", maxAttempts = " + max_attempts_ + ", timeout = " + + TimeUtil::ToMillis(this->operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMs(this->start_ns_) + " ms";}, + [&, this](const std::exception& error) { + conn_->get_locator().UpdateCachedLocation(loc, error);}); + return; + }); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + long RemainingTimeNs() { + return operation_timeout_nanos_ - (TimeUtil::GetNowNanos() - start_ns_); + } + +private: + std::shared_ptr retry_timer_; + std::shared_ptr conn_; + std::shared_ptr table_name_; + std::unique_ptr row_; + RegionLocateType locate_type_; + Callable callable_; + long pause_ns_; + int max_retries_; + long operation_timeout_nanos_; + long rpc_timeout_nanos_; + int start_log_errors_count_; + std::shared_ptr> promise_; + std::shared_ptr controller_; + long start_ns_; + int tries_; + std::shared_ptr> exceptions_; + int max_attempts_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc new file mode 100644 index 0000000..f5b5963 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase-rpc-controller.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h new file mode 100644 index 0000000..ca79d2a --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.h @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +using google::protobuf::RpcController; + +namespace hbase { + +class HBaseRpcController: public RpcController { +public: + HBaseRpcController() {} + virtual ~HBaseRpcController() = default; + + void setCallTimeout(const long& call_timeout); +}; + +} /* namespace hbase */ + + diff --git a/hbase-native-client/core/hconstants.h b/hbase-native-client/core/hconstants.h new file mode 100644 index 0000000..ce41e74 --- /dev/null +++ b/hbase-native-client/core/hconstants.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +namespace hbase { + +class HConstants { +public: + /** + * Retrying we multiply hbase.client.pause setting by what we have in this array until we + * run out of array items. Retries beyond this use the last number in the array. So, for + * example, if hbase.client.pause is 1 second, and maximum retries count + * hbase.client.retries.number is 10, we will retry at the following intervals: + * 1, 2, 3, 5, 10, 20, 40, 100, 100, 100. + * With 100ms, a back-off of 200 means 20s + */ + static const std::vector RETRY_BACKOFF; +}; + +const std::vector HConstants::RETRY_BACKOFF = { 1, 2, 3, 5, 10, 20, 40, + 100, 100, 100, 100, 200, 200 }; +} /* namespace hbase */ + diff --git a/hbase-native-client/core/hregion-info.h b/hbase-native-client/core/hregion-info.h new file mode 100644 index 0000000..c4d7107 --- /dev/null +++ b/hbase-native-client/core/hregion-info.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "if/HBase.pb.h" +#include + +namespace hbase { +class HRegionInfo { +public: + HRegionInfo( + const long& region_id, + const hbase::pb::TableName& table_name, + const int& replica_id) : + region_id_(region_id), table_name_(table_name), replica_id_(replica_id) { + + } + + std::string GetPrintName() const { + return folly::sformat( + "{0}-{1}.{2}-{3}", + table_name_.namespace_(), + table_name_.qualifier(), region_id_, replica_id_); + } + +private: + std::string encoded_name; + long region_id_; + hbase::pb::TableName table_name_; + int replica_id_; + +}; +} /* namespace hbase */ diff --git a/hbase-native-client/core/hregion-location.cc b/hbase-native-client/core/hregion-location.cc new file mode 100644 index 0000000..97202a4 --- /dev/null +++ b/hbase-native-client/core/hregion-location.cc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hregion-location.h" + +namespace hbase { + +} /* namespace hbase */ diff --git a/hbase-native-client/core/hregion-location.h b/hbase-native-client/core/hregion-location.h new file mode 100644 index 0000000..40ae2d0 --- /dev/null +++ b/hbase-native-client/core/hregion-location.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "hregion-info.h" +#include "if/HBase.pb.h" + +namespace hbase { + +enum RegionLocateType { + kBefore, + kCurrent, + kAfter +}; + +class HRegionLocation { +public: + HRegionLocation( + const long& seq_num, + const hbase::pb::ServerName& server_name, + std::shared_ptr region_info) : + seq_num_(seq_num), server_name_(server_name), region_info_(region_info) {} + virtual ~HRegionLocation(); + + long set_num() { + return seq_num_; + } + + hbase::pb::ServerName server_name() const { + return server_name_; + } + + std::shared_ptr region_info() const { + return region_info_; + } + +private: + long seq_num_; + hbase::pb::ServerName server_name_; + std::shared_ptr region_info_; +}; + +} /* namespace hbase */ + + diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h new file mode 100644 index 0000000..a95ba68 --- /dev/null +++ b/hbase-native-client/exceptions/exception.h @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +namespace hbase { + +class ThrowableWithExtraContext { +public: + ThrowableWithExtraContext(std::shared_ptr cause, + const long& when, const std::string& extras) : + cause_(cause), when_(when), extras_(extras) { + } + + std::string ToString() { + // TODO: + // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + return extras_ + ", " + cause_->what(); + } + + std::shared_ptr cause() { + return cause_; + } +private: + std::shared_ptr cause_; + long when_; + std::string extras_; +}; + +class IOException: public std::logic_error { +public: + IOException( + const std::string& what, + std::shared_ptr cause) : + logic_error(what), cause_(cause) {} + virtual ~IOException() = default; + + std::shared_ptr cause() { + return cause_; + } +private: + const std::shared_ptr cause_; +}; + +class RetriesExhaustedException: public IOException { +public: + RetriesExhaustedException( + const int& num_retries, + std::shared_ptr> exceptions) : + IOException( + GetMessage(num_retries, exceptions), + exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + } + virtual ~RetriesExhaustedException() = default; + +private: + std::string GetMessage( + const int& num_retries, + std::shared_ptr> exceptions) { + std::string buffer("Failed after attempts="); + buffer.append(std::to_string(num_retries + 1)); + buffer.append(", exceptions:\n"); + for (auto it = exceptions->begin(); it != exceptions->end(); it++) { + buffer.append(it->ToString()); + buffer.append("\n"); + } + return buffer; + } +}; + +class HBaseIOException : public IOException { +}; + +class DoNotRetryIOException : public HBaseIOException { +}; +} // namespace hbase diff --git a/hbase-native-client/if/Client.proto b/hbase-native-client/if/Client.proto index 8a4d459..301d52a 100644 --- a/hbase-native-client/if/Client.proto +++ b/hbase-native-client/if/Client.proto @@ -22,6 +22,7 @@ package hbase.pb; option java_package = "org.apache.hadoop.hbase.protobuf.generated"; option java_outer_classname = "ClientProtos"; option java_generic_services = true; +option cc_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h new file mode 100644 index 0000000..107d358 --- /dev/null +++ b/hbase-native-client/utils/connection-util.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include ; +#include "core/hconstants.h" +#include +#include "core/hbase-rpc-controller.h" +#include "utils/time-util.h" + +namespace hbase { +class ConnectionUtils { +public: + static int Retries2Attempts(const int& retries) { + return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1); + } + + /* Add a delta to avoid timeout immediately after a retry sleeping. */ + static const long SLEEP_DELTA_NS = 1000000; + + /** + * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. + * @param pause time to pause + * @param tries amount of tries + * @return How long to wait after tries retries + */ + static long GetPauseTime(const long& pause, const int& tries) { + int ntries = tries; + if (ntries >= HConstants::RETRY_BACKOFF.size()) { + ntries = HConstants::RETRY_BACKOFF.size() - 1; + } + if (ntries < 0) { + ntries = 0; + } + + long normal_pause = pause * HConstants::RETRY_BACKOFF[ntries]; + // 1% possible jitter + float r = static_cast(std::rand()) / static_cast(RAND_MAX); + long jitter = (long) (normal_pause * r * 0.01f); + return normal_pause + jitter; + } + + static void ResetController( + std::shared_ptr controller, + const long& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->setCallTimeout( + std::min(static_cast(INT_MAX), TimeUtil::ToMillis(timeout_ns))); + } + } +}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h new file mode 100644 index 0000000..759be24 --- /dev/null +++ b/hbase-native-client/utils/sys-util.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include + +namespace hbase { + +class SysUtil { +public: + template + static constexpr bool InstanceOf(const DERIVED& object) { + return !dynamic_cast(object); + } + + template + static constexpr bool InstanceOf() { + return std::is_base_of(); + } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h new file mode 100644 index 0000000..dfd51b4 --- /dev/null +++ b/hbase-native-client/utils/time-util.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class TimeUtil { +public: + static long ToMillis(const long& nanos) { + std::chrono::duration_cast(nanoseconds(nanos)).count(); + } + + static long GetNowNanos() { + auto duration = + std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast(duration).count(); + } + + static long ElapsedMs(const long& start_ns) { + std::chrono::duration_cast( + nanoseconds(GetNowNanos() - start_ns)).count(); + } +}; +} /* namespace hbase */ + -- 2.10.1 (Apple Git-78)