diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb..62dd548 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -31,6 +31,7 @@ cxx_library( "rpc-connection.h", "response.h", "service.h", + "failed-servers.h", "rpc-client.h", "sasl-util.h", ], @@ -42,6 +43,7 @@ cxx_library( "pipeline.cc", "request.cc", "rpc-client.cc", + "failed-servers.cc", "sasl-handler.cc", "sasl-util.cc", ], diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index d1bfbce..a5dfafe 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -24,6 +24,7 @@ #include #include +#include #include "connection/client-dispatcher.h" #include "connection/pipeline.h" #include "connection/service.h" @@ -40,7 +41,8 @@ ConnectionFactory::ConnectionFactory(std::shared_ptr(codec, conf)) {} + pipeline_factory_(std::make_shared(codec, conf)), + failed_servers_(std::make_shared()) {} std::shared_ptr> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared>(); @@ -59,12 +61,18 @@ std::shared_ptr ConnectionFactory::Connect( // Yes this will block however it makes dealing with connection pool soooooo // much nicer. // TODO see about using shared promise for this. - auto pipeline = client + try { + auto pipeline = client ->connect(folly::SocketAddress(hostname, port, true), std::chrono::duration_cast(connect_timeout_)) .get(); - auto dispatcher = std::make_shared(); - dispatcher->setPipeline(pipeline); - return dispatcher; + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + return dispatcher; + } catch(const folly::AsyncSocketException &e) { + if (e.getType() == folly::AsyncSocketException::AsyncSocketExceptionType::TIMED_OUT) { + failed_servers_->AddToFailedServers(hostname, port); + } + } } } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index c96087d..2a717fb 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -24,6 +24,7 @@ #include #include +#include "connection/failed-servers.h" #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" @@ -63,10 +64,13 @@ class ConnectionFactory { std::shared_ptr> client, const std::string &hostname, uint16_t port); + std::shared_ptr GetFailedServers() { return failed_servers_; } + private: std::chrono::nanoseconds connect_timeout_; std::shared_ptr conf_; std::shared_ptr io_pool_; std::shared_ptr pipeline_factory_; + std::shared_ptr failed_servers_; }; } // namespace hbase diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index c7c4246..de70989 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -64,6 +64,7 @@ class ConnectionPool { * This can be a blocking operation for a short time. */ std::shared_ptr GetConnection(std::shared_ptr remote_id); + std::shared_ptr GetConnectionFactory() { return cf_; } /** * Close/remove a connection. diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 10faa7a..f0f04a6 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -68,6 +68,11 @@ folly::Future> RpcClient::AsyncCall(const std::string& } std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { + auto failed_servers = cp_->GetConnectionFactory()->GetFailedServers(); + if (failed_servers->IsFailedServer(remote_id->host(), remote_id->port())) { + throw std::runtime_error("This server is in the failed servers list: " + remote_id->host() + + ":" + std::to_string(remote_id->port())); + } return cp_->GetConnection(remote_id); } } // namespace hbase diff --git a/hbase-native-client/connection/failed-servers.h b/hbase-native-client/connection/failed-servers.h new file mode 100644 index 0000000..41fe411 --- /dev/null +++ b/hbase-native-client/connection/failed-servers.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "connection/connection-id.h" + +namespace hbase { + +class FailedServers { + public: + FailedServers(std::chrono::milliseconds recheck_timeout = std::chrono::milliseconds(2'000)); + + ~FailedServers() = default; + + void AddToFailedServers(const std::string& host, uint16_t port); + bool IsFailedServer(const std::string& host, uint16_t port); + + private: + //std::chrono::milliseconds + int64_t recheck_timeout_; + std::vector> failed_servers_; +}; +} // namespace hbase diff --git a/hbase-native-client/connection/failed-servers.cc b/hbase-native-client/connection/failed-servers.cc new file mode 100644 index 0000000..623c281 --- /dev/null +++ b/hbase-native-client/connection/failed-servers.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/failed-servers.h" + +#include +#include + +#include "utils/time-util.h" + +using std::chrono::nanoseconds; + +namespace hbase { + +FailedServers::FailedServers(std::chrono::milliseconds recheck_timeout) : + recheck_timeout_(recheck_timeout.count()) { +} +void FailedServers::AddToFailedServers(const std::string& host, uint16_t port) { + failed_servers_.push_back(std::make_pair(TimeUtil::GetNowNanos(), + host+":"+std::to_string(port))); +} +bool FailedServers::IsFailedServer(const std::string& host, uint16_t port) { + if (failed_servers_.empty()) return false; + std::string lookup = host+":"+std::to_string(port); + auto iter = failed_servers_.begin(); + while (iter != failed_servers_.end()) { + if (TimeUtil::ElapsedMillis(iter->first) > recheck_timeout_) { + iter = failed_servers_.erase(iter); + } else { + if (lookup == iter->second) return true; + iter++; + } + } + return false; +} + +// +} // namespace hbase