diff --git hbase-native-client/bin/cpplint.sh hbase-native-client/bin/cpplint.sh index 78a00bd..81795fd 100755 --- hbase-native-client/bin/cpplint.sh +++ hbase-native-client/bin/cpplint.sh @@ -26,4 +26,5 @@ wget -nc $CPPLINT_LOC -O $OUTPUT # Execute the script # Exclude the following rules: build/header_guard (We use #pragma once instead) # readability/todo (TODOs are generic) -find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 python $OUTPUT --filter=-build/header_guard,-readability/todo --linelength=100 +# build/c++11 (We are building with c++14) +find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 python $OUTPUT --filter=-build/header_guard,-readability/todo,-build/c++11 --linelength=100 diff --git hbase-native-client/connection/connection-factory.cc hbase-native-client/connection/connection-factory.cc index ff83212..8cdeed1 100644 --- hbase-native-client/connection/connection-factory.cc +++ hbase-native-client/connection/connection-factory.cc @@ -23,17 +23,28 @@ #include "connection/pipeline.h" #include "connection/service.h" +#include + using namespace folly; using namespace hbase; -ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool) - : io_pool_(io_pool), pipeline_factory_(std::make_shared()) {} +using std::chrono::milliseconds; +using std::chrono::nanoseconds; + +ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool, + nanoseconds connect_timeout) + : connect_timeout_(connect_timeout), + io_pool_(io_pool), + pipeline_factory_(std::make_shared()) {} std::shared_ptr> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared>(); client->group(io_pool_); client->pipelineFactory(pipeline_factory_); + // TODO: Opened https://github.com/facebook/wangle/issues/85 in wangle so that we can set socket + // options like TCP_NODELAY, SO_KEEPALIVE, CONNECT_TIMEOUT_MILLIS, etc. + return client; } std::shared_ptr ConnectionFactory::Connect( @@ -42,7 +53,10 @@ std::shared_ptr ConnectionFactory::Connect( // Yes this will block however it makes dealing with connection pool soooooo // much nicer. // TODO see about using shared promise for this. - auto pipeline = client->connect(SocketAddress(hostname, port, true)).get(); + auto pipeline = client + ->connect(SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)) + .get(); auto dispatcher = std::make_shared(); dispatcher->setPipeline(pipeline); return dispatcher; diff --git hbase-native-client/connection/connection-factory.h hbase-native-client/connection/connection-factory.h index da44c35..f48f93e 100644 --- hbase-native-client/connection/connection-factory.h +++ hbase-native-client/connection/connection-factory.h @@ -20,6 +20,7 @@ #include +#include #include #include @@ -28,6 +29,8 @@ #include "connection/response.h" #include "connection/service.h" +using std::chrono::nanoseconds; + namespace hbase { /** @@ -40,8 +43,10 @@ class ConnectionFactory { * Constructor. * There should only be one ConnectionFactory per client. */ - explicit ConnectionFactory(std::shared_ptr io_pool); - /** Default Desctructor */ + explicit ConnectionFactory(std::shared_ptr io_pool, + nanoseconds connect_timeout = nanoseconds(0)); + + /** Default Destructor */ virtual ~ConnectionFactory() = default; /** @@ -59,6 +64,7 @@ class ConnectionFactory { const std::string &hostname, int port); private: + nanoseconds connect_timeout_; std::shared_ptr io_pool_; std::shared_ptr pipeline_factory_; }; diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index 15dd64e..954e583 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -33,8 +33,9 @@ using hbase::HBaseService; using folly::SharedMutexWritePriority; using folly::SocketAddress; -ConnectionPool::ConnectionPool(std::shared_ptr io_executor) - : cf_(std::make_shared(io_executor)), +ConnectionPool::ConnectionPool(std::shared_ptr io_executor, + nanoseconds connect_timeout) + : cf_(std::make_shared(io_executor, connect_timeout)), clients_(), connections_(), map_mutex_() {} diff --git hbase-native-client/connection/connection-pool.h hbase-native-client/connection/connection-pool.h index 1f2a182..7f2df6a 100644 --- hbase-native-client/connection/connection-pool.h +++ hbase-native-client/connection/connection-pool.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,8 @@ using hbase::ConnectionIdEquals; using hbase::ConnectionIdHash; using hbase::RpcConnection; +using std::chrono::nanoseconds; + namespace hbase { /** @@ -46,14 +49,8 @@ namespace hbase { class ConnectionPool { public: /** Create connection pool wit default connection factory */ - explicit ConnectionPool(std::shared_ptr io_executor); - - /** - * Desctructor. - * All connections will be close. - * All connections will be released - */ - ~ConnectionPool(); + ConnectionPool(std::shared_ptr io_executor, + nanoseconds connect_timeout = nanoseconds(0)); /** * Constructor that allows specifiying the connetion factory. @@ -62,6 +59,13 @@ class ConnectionPool { explicit ConnectionPool(std::shared_ptr cf); /** + * Destructor. + * All connections will be close. + * All connections will be released + */ + ~ConnectionPool(); + + /** * Get a connection to the server name. Start time is ignored. * This can be a blocking operation for a short time. */ diff --git hbase-native-client/connection/rpc-client.cc hbase-native-client/connection/rpc-client.cc index 7621193..efcee7f 100644 --- hbase-native-client/connection/rpc-client.cc +++ hbase-native-client/connection/rpc-client.cc @@ -39,9 +39,10 @@ class RpcChannelImplementation : public AbstractRpcChannel { }; } // namespace hbase -RpcClient::RpcClient(std::shared_ptr io_executor) +RpcClient::RpcClient(std::shared_ptr io_executor, + nanoseconds connect_timeout) : io_executor_(io_executor) { - cp_ = std::make_shared(io_executor_); + cp_ = std::make_shared(io_executor_, connect_timeout); } void RpcClient::Close() { io_executor_->stop(); } diff --git hbase-native-client/connection/rpc-client.h hbase-native-client/connection/rpc-client.h index aeb9b56..bc94718 100644 --- hbase-native-client/connection/rpc-client.h +++ hbase-native-client/connection/rpc-client.h @@ -26,6 +26,7 @@ #include +#include #include using hbase::security::User; @@ -43,6 +44,8 @@ using google::protobuf::Message; using google::protobuf::RpcController; using google::protobuf::Closure; +using std::chrono::nanoseconds; + class RpcChannelImplementation; namespace hbase { @@ -51,7 +54,7 @@ class RpcClient : public std::enable_shared_from_this { friend class RpcChannelImplementation; public: - RpcClient(std::shared_ptr io_executor); + RpcClient(std::shared_ptr io_executor, nanoseconds connect_timeout); virtual ~RpcClient() { Close(); } diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index d8d15a9..0c6e449 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -25,6 +25,7 @@ cxx_library( "keyvalue-codec.h", "region-location.h", "location-cache.h", + "connection-configuration.h", # TODO: move this out of exported # Once meta lookup works "meta-utils.h", diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index c1efd8b..88d3ef7 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -20,6 +20,7 @@ #include "core/client.h" #include +#include #include #include @@ -42,11 +43,14 @@ void Client::init(const hbase::Configuration &conf) { conf_ = std::make_shared(conf); auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); - cpu_executor_ = - std::make_shared(4); // TODO: read num threads from conf - io_executor_ = std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); + conn_conf_ = std::make_shared(*conf_); + // start thread pools + auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN)); + auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); + cpu_executor_ = std::make_shared(cpu_threads); + io_executor_ = std::make_shared(io_threads); - rpc_client_ = std::make_shared(io_executor_); + rpc_client_ = std::make_shared(io_executor_, conn_conf_->connect_timeout()); location_cache_ = std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); } diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index 730981d..052b913 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -29,6 +29,7 @@ #include "connection/rpc-client.h" #include "core/configuration.h" +#include "core/connection-configuration.h" #include "core/hbase_configuration_loader.h" #include "core/location-cache.h" #include "core/table.h" @@ -53,13 +54,13 @@ class Client { * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ Client(); - explicit Client(const hbase::Configuration &conf); + explicit Client(const hbase::Configuration& conf); ~Client(); /** * @brief Retrieve a Table implementation for accessing a table. * @param - table_name */ - std::unique_ptr Table(const TableName &table_name); + std::unique_ptr Table(const TableName& table_name); /** * @brief Close the Client connection. @@ -67,15 +68,25 @@ class Client { void Close(); private: - void init(const hbase::Configuration &conf); - const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; - const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; + /** Constants */ + static constexpr const char* kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; + static constexpr const char* kDefHBaseZookeeperQuorum_ = "localhost:2181"; + /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ + static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; + /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ + static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; + + /** Data */ std::shared_ptr cpu_executor_; std::shared_ptr io_executor_; std::shared_ptr location_cache_; std::shared_ptr rpc_client_; std::shared_ptr conf_; + std::shared_ptr conn_conf_; bool is_closed_ = false; + + /** Methods */ + void init(const hbase::Configuration& conf); }; } // namespace hbase diff --git hbase-native-client/core/connection-configuration.h hbase-native-client/core/connection-configuration.h new file mode 100644 index 0000000..e1e9f87 --- /dev/null +++ hbase-native-client/core/connection-configuration.h @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "core/configuration.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +/** + * Timeout configs. + */ +class ConnectionConfiguration { + public: + explicit ConnectionConfiguration(const Configuration& conf) { + connect_timeout_ = + ToNanos(conf.GetInt(kClientSocketConnectTimeout, kDefaultClientSocketConnectTimeout)); + meta_operation_timeout_ = + ToNanos(conf.GetLong(kClientMetaOperationTimeout, kDefaultClientOperationTimeout)); + operation_timeout_ = + ToNanos(conf.GetLong(kClientOperationTimeout, kDefaultClientOperationTimeout)); + rpc_timeout_ = ToNanos(conf.GetLong(kRpcTimeout, kDefaultRpcTimeout)); + read_rpc_timeout_ = ToNanos(conf.GetLong(kRpcReadTimeout, ToMillis(rpc_timeout_))); + write_rpc_timeout_ = ToNanos(conf.GetLong(kRpcWriteTimeout, ToMillis(rpc_timeout_))); + pause_ = ToNanos(conf.GetLong(kClientPause, kDefaultClientPause)); + max_retries_ = conf.GetInt(kClientRetriesNumber, kDefaultClientRetriesNumber); + start_log_errors_count_ = + conf.GetInt(kStartLogErrorsAfterCount, kDefaultStartLogErrorsAfterCount); + scan_timeout_ = + ToNanos(conf.GetLong(kClientScannerTimeoutPeriod, kDefaultClientScannerTimeoutPeriod)); + scanner_caching_ = conf.GetInt(kClientScannerCaching, kDefaultClientScannerCaching); + scanner_max_result_size_ = + conf.GetLong(kClientScannerMaxResultsSize, kDefaultClientScannerMaxResultsSize); + } + + nanoseconds connect_timeout() const { return connect_timeout_; } + + nanoseconds meta_operation_timeout() const { return meta_operation_timeout_; } + + // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected + // by this value, see scanTimeoutNs. + nanoseconds operation_timeout() const { return operation_timeout_; } + + // timeout for each rpc request. Can be overridden by a more specific config, such as + // readRpcTimeout or writeRpcTimeout. + nanoseconds rpc_timeout() const { return rpc_timeout_; } + + // timeout for each read rpc request + nanoseconds read_rpc_timeout() const { return read_rpc_timeout_; } + + // timeout for each write rpc request + nanoseconds write_rpc_timeout() const { return write_rpc_timeout_; } + + nanoseconds pause_nanos() const { return pause_; } + + uint32_t max_retries() const { return max_retries_; } + + /** How many retries are allowed before we start to log */ + uint32_t start_log_errors_count() const { return start_log_errors_count_; } + + // The scan timeout is used as operation timeout for every + // operations in a scan, such as openScanner or next. + nanoseconds scan_timeout() const { return scan_timeout_; } + + uint32_t scanner_caching() const { return scanner_caching_; } + + uint64_t scanner_max_result_size() const { return scanner_max_result_size_; } + + private: + /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ + static constexpr const char* kClientSocketConnectTimeout = + "hbase.ipc.client.socket.timeout.connect"; + /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ + static constexpr const uint32_t kDefaultClientSocketConnectTimeout = 10000; // 10 secs + + /** Parameter name for HBase client operation timeout. */ + static constexpr const char* kClientOperationTimeout = "hbase.client.operation.timeout"; + + /** Parameter name for HBase client meta operation timeout. */ + static constexpr const char* kClientMetaOperationTimeout = "hbase.client.meta.operation.timeout"; + + /** Default HBase client operation timeout, which is tantamount to a blocking call */ + static constexpr const uint32_t kDefaultClientOperationTimeout = 1200000; + + /** timeout for each RPC */ + static constexpr const char* kRpcTimeout = "hbase.rpc.timeout"; + + /** timeout for each read RPC */ + static constexpr const char* kRpcReadTimeout = "hbase.rpc.read.timeout"; + + /** timeout for each write RPC */ + static constexpr const char* kRpcWriteTimeout = "hbase.rpc.write.timeout"; + + static constexpr const uint32_t kDefaultRpcTimeout = 60000; + + /** + * Parameter name for client pause value, used mostly as value to wait + * before running a retry of a failed get, region lookup, etc. + */ + static constexpr const char* kClientPause = "hbase.client.pause"; + + static constexpr const uint64_t kDefaultClientPause = 100; + + /** + * Parameter name for maximum retries, used as maximum for all retryable + * operations such as fetching of the root region from root region server, + * getting a cell's value, starting a row update, etc. + */ + static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number"; + + static constexpr const uint32_t kDefaultClientRetriesNumber = 31; + + /** + * Configure the number of failures after which the client will start logging. A few failures + * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable + * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at + * this stage. + */ + static constexpr const char* kStartLogErrorsAfterCount = "hbase.client.start.log.errors.counter"; + static constexpr const uint32_t kDefaultStartLogErrorsAfterCount = 9; + + /** The client scanner timeout period in milliseconds. */ + static constexpr const char* kClientScannerTimeoutPeriod = "hbase.client.scanner.timeout.period"; + + static constexpr const uint32_t kDefaultClientScannerTimeoutPeriod = 60000; + + /** + * Parameter name to set the default scanner caching for all clients. + */ + static constexpr const char* kClientScannerCaching = "hbase.client.scanner.caching"; + + static constexpr const uint32_t kDefaultClientScannerCaching = INT_MAX; + + /** + * Parameter name for maximum number of bytes returned when calling a scanner's next method. + * Controlled by the client. + */ + static constexpr const char* kClientScannerMaxResultsSize = + "hbase.client.scanner.max.result.size"; + + /** + * Maximum number of bytes returned when calling a scanner's next method. + * Note that when a single row is larger than this limit the row is still + * returned completely. + * + * The default value is 2MB. + */ + static constexpr const uint64_t kDefaultClientScannerMaxResultsSize = 2 * 1024 * 1024; + + nanoseconds connect_timeout_; + nanoseconds meta_operation_timeout_; + nanoseconds operation_timeout_; + nanoseconds rpc_timeout_; + nanoseconds read_rpc_timeout_; + nanoseconds write_rpc_timeout_; + nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; + nanoseconds scan_timeout_; + uint32_t scanner_caching_; + uint64_t scanner_max_result_size_; + + static nanoseconds ToNanos(const uint64_t& millis) { + return std::chrono::duration_cast(milliseconds(millis)); + } + + static uint64_t ToMillis(const nanoseconds& nanos) { + return std::chrono::duration_cast(nanos).count(); + } +}; + +} // namespace hbase