diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index d8d15a9..0c6e449 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -25,6 +25,7 @@ cxx_library( "keyvalue-codec.h", "region-location.h", "location-cache.h", + "connection-configuration.h", # TODO: move this out of exported # Once meta lookup works "meta-utils.h", diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index c1efd8b..b27ac53 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -23,6 +23,8 @@ #include #include +#include "core/connection-configuration.h" + namespace hbase { Client::Client() { diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index 730981d..28a554e 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -53,13 +53,13 @@ class Client { * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ Client(); - explicit Client(const hbase::Configuration &conf); + explicit Client(const hbase::Configuration& conf); ~Client(); /** * @brief Retrieve a Table implementation for accessing a table. * @param - table_name */ - std::unique_ptr Table(const TableName &table_name); + std::unique_ptr Table(const TableName& table_name); /** * @brief Close the Client connection. @@ -67,15 +67,22 @@ class Client { void Close(); private: - void init(const hbase::Configuration &conf); - const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; - const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; + /** Constants */ + static constexpr const char* kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; + static constexpr const char* kDefHBaseZookeeperQuorum_ = "localhost:2181"; + /** Parameter name for HBase client IPC pool size */ + static constexpr const char* kClientIpcPoolSize = "hbase.client.ipc.pool.size"; + + /** Data */ std::shared_ptr cpu_executor_; std::shared_ptr io_executor_; std::shared_ptr location_cache_; std::shared_ptr rpc_client_; std::shared_ptr conf_; bool is_closed_ = false; + + /** Methods */ + void init(const hbase::Configuration& conf); }; } // namespace hbase diff --git hbase-native-client/core/connection-configuration.h hbase-native-client/core/connection-configuration.h new file mode 100644 index 0000000..23477ac --- /dev/null +++ hbase-native-client/core/connection-configuration.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "core/configuration.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +/** + * Timeout configs. + */ +class ConnectionConfiguration { + public: + ConnectionConfiguration(const Configuration& conf) { + meta_operation_timeout_nanos_ = + ToNanos(conf.GetLong(kClientMetaOperationTimeout, kDefaultClientOperationTimeout)); + operation_timeout_nanos_ = + ToNanos(conf.GetLong(kClientOperationTimeout, kDefaultClientOperationTimeout)); + rpc_timeout_nanos_ = ToNanos(conf.GetLong(kRpcTimeout, kDefaultRpcTimeout)); + read_rpc_timeout_nanos_ = ToNanos(conf.GetLong(kRpcReadTimeout, ToMillis(rpc_timeout_nanos_))); + write_rpc_timeout_nanos_ = + ToNanos(conf.GetLong(kRpcWriteTimeout, ToMillis(rpc_timeout_nanos_))); + pause_nanos_ = ToNanos(conf.GetLong(kClientPause, kDefaultClientPause)); + max_retries_ = conf.GetInt(kClientRetriesNumber, kDefaultClientRetriesNumber); + start_log_errors_count_ = + conf.GetInt(kStartLogErrorsAfterCount, kDefaultStartLogErrorsAfterCount); + scan_timeout_nanos_ = + ToNanos(conf.GetLong(kClientScannerTimeoutPeriod, kDefaultClientScannerTimeoutPeriod)); + scanner_caching_ = conf.GetInt(kClientScannerCaching, kDefaultClientScannerCaching); + scanner_max_result_size_ = + conf.GetLong(kClientScannerMaxResultsSize, kDefaultClientScannerMaxResultsSize); + } + + uint64_t meta_operation_timeout_nanos() const { return meta_operation_timeout_nanos_; } + + // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected + // by this value, see scanTimeoutNs. + uint64_t operation_timeout_nanos() const { return operation_timeout_nanos_; } + + // timeout for each rpc request. Can be overridden by a more specific config, such as + // readRpcTimeout or writeRpcTimeout. + uint64_t rpc_timeout_nanos() const { return rpc_timeout_nanos_; } + + // timeout for each read rpc request + uint64_t read_rpc_timeout_nanos() const { return read_rpc_timeout_nanos_; } + + // timeout for each write rpc request + uint64_t write_rpc_timeout_nanos() const { return write_rpc_timeout_nanos_; } + + uint64_t pause_nanos() const { return pause_nanos_; } + + uint32_t max_retries() const { return max_retries_; } + + /** How many retries are allowed before we start to log */ + uint32_t start_log_errors_count() const { return start_log_errors_count_; } + + // The scan timeout is used as operation timeout for every + // operations in a scan, such as openScanner or next. + uint64_t scan_timeout_nanos() const { return scan_timeout_nanos_; } + + uint32_t scanner_caching() const { return scanner_caching_; } + + uint64_t scanner_max_result_size() const { return scanner_max_result_size_; } + + private: + /** Parameter name for HBase client operation timeout. */ + static constexpr const char* kClientOperationTimeout = "hbase.client.operation.timeout"; + + /** Parameter name for HBase client meta operation timeout. */ + static constexpr const char* kClientMetaOperationTimeout = "hbase.client.meta.operation.timeout"; + + /** Default HBase client operation timeout, which is tantamount to a blocking call */ + static constexpr const uint32_t kDefaultClientOperationTimeout = 1200000; + + /** timeout for each RPC */ + static constexpr const char* kRpcTimeout = "hbase.rpc.timeout"; + + /** timeout for each read RPC */ + static constexpr const char* kRpcReadTimeout = "hbase.rpc.read.timeout"; + + /** timeout for each write RPC */ + static constexpr const char* kRpcWriteTimeout = "hbase.rpc.write.timeout"; + + static constexpr const uint32_t kDefaultRpcTimeout = 60000; + + /** + * Parameter name for client pause value, used mostly as value to wait + * before running a retry of a failed get, region lookup, etc. + */ + static constexpr const char* kClientPause = "hbase.client.pause"; + + static constexpr const uint64_t kDefaultClientPause = 100; + + /** + * Parameter name for maximum retries, used as maximum for all retryable + * operations such as fetching of the root region from root region server, + * getting a cell's value, starting a row update, etc. + */ + static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number"; + + static constexpr const uint32_t kDefaultClientRetriesNumber = 31; + + /** + * Configure the number of failures after which the client will start logging. A few failures + * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable + * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at + * this stage. + */ + static constexpr const char* kStartLogErrorsAfterCount = "hbase.client.start.log.errors.counter"; + static constexpr const uint32_t kDefaultStartLogErrorsAfterCount = 9; + + /** The client scanner timeout period in milliseconds. */ + static constexpr const char* kClientScannerTimeoutPeriod = "hbase.client.scanner.timeout.period"; + + static constexpr const uint32_t kDefaultClientScannerTimeoutPeriod = 60000; + + /** + * Parameter name to set the default scanner caching for all clients. + */ + static constexpr const char* kClientScannerCaching = "hbase.client.scanner.caching"; + + static constexpr const uint32_t kDefaultClientScannerCaching = INT_MAX; + + /** + * Parameter name for maximum number of bytes returned when calling a scanner's next method. + * Controlled by the client. + */ + static constexpr const char* kClientScannerMaxResultsSize = + "hbase.client.scanner.max.result.size"; + + /** + * Maximum number of bytes returned when calling a scanner's next method. + * Note that when a single row is larger than this limit the row is still + * returned completely. + * + * The default value is 2MB. + */ + static constexpr const uint64_t kDefaultClientScannerMaxResultsSize = 2 * 1024 * 1024; + + uint64_t meta_operation_timeout_nanos_; + uint64_t operation_timeout_nanos_; + uint64_t rpc_timeout_nanos_; + uint64_t read_rpc_timeout_nanos_; + uint64_t write_rpc_timeout_nanos_; + uint64_t pause_nanos_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; + uint64_t scan_timeout_nanos_; + uint32_t scanner_caching_; + uint64_t scanner_max_result_size_; + + static uint64_t ToNanos(const uint64_t& millis) { + return std::chrono::duration_cast(milliseconds(millis)).count(); + } + + static uint64_t ToMillis(const uint64_t& nanos) { + return std::chrono::duration_cast(nanoseconds(nanos)).count(); + } +}; + +} // namespace hbase