From abb502457ea277dceff690135d38c600660ea55f Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Thu, 6 Jul 2017 14:58:39 -0700 Subject: [PATCH] HBASE-18078. [C++] Harden RPC by handling various communication abnormalities --- hbase-native-client/connection/BUCK | 10 + hbase-native-client/connection/BootstrapTest.cpp | 421 +++++++++++++++++++++ .../connection/connection-factory.cc | 38 +- .../connection/connection-factory.h | 13 +- hbase-native-client/connection/connection-pool.cc | 75 +++- hbase-native-client/connection/connection-pool.h | 7 + hbase-native-client/connection/rpc-client.cc | 17 +- hbase-native-client/connection/rpc-client.h | 5 +- hbase-native-client/connection/rpc-test-server.cc | 19 + hbase-native-client/connection/rpc-test-server.h | 48 +++ hbase-native-client/connection/rpc-test.cc | 42 ++ hbase-native-client/exceptions/exception.h | 21 +- 12 files changed, 686 insertions(+), 30 deletions(-) create mode 100644 hbase-native-client/connection/BootstrapTest.cpp create mode 100644 hbase-native-client/connection/rpc-test-server.cc create mode 100644 hbase-native-client/connection/rpc-test-server.h create mode 100644 hbase-native-client/connection/rpc-test.cc diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb..cec51ee 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -68,3 +68,13 @@ cxx_test( deps=[ ":connection", ],) +cxx_test( + name="bootstrap-test", + srcs=["BootstrapTest.cpp",], + deps=[":connection",], + run_test_separately=True,) +cxx_test( + name="rpc-test", + srcs=["rpc-test.cc",], + deps=[":connection",], + run_test_separately=True,) \ No newline at end of file diff --git a/hbase-native-client/connection/BootstrapTest.cpp b/hbase-native-client/connection/BootstrapTest.cpp new file mode 100644 index 0000000..211ff47 --- /dev/null +++ b/hbase-native-client/connection/BootstrapTest.cpp @@ -0,0 +1,421 @@ +/* + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#include "wangle/bootstrap/ServerBootstrap.h" +#include "wangle/bootstrap/ClientBootstrap.h" +#include "wangle/channel/Handler.h" + +#include +#include +#include +#include +#include + +using namespace wangle; +using namespace folly; + +typedef Pipeline> BytesPipeline; + +typedef ServerBootstrap TestServer; +typedef ClientBootstrap TestClient; + +class TestClientPipelineFactory : public PipelineFactory { + public: + BytesPipeline::Ptr newPipeline( + std::shared_ptr sock) override { + // Socket should be connected already + EXPECT_TRUE(sock->good()); + + // Check after a small delay that socket is readable + EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){ + EXPECT_TRUE(sock->readable()); + }, 100); + + auto pipeline = BytesPipeline::create(); + pipeline->addBack(new BytesToBytesHandler()); + pipeline->finalize(); + return pipeline; + } +}; + +class TestPipelineFactory : public PipelineFactory { + public: + BytesPipeline::Ptr newPipeline( + std::shared_ptr) override { + pipelines++; + auto pipeline = BytesPipeline::create(); + pipeline->addBack(new BytesToBytesHandler()); + pipeline->finalize(); + return pipeline; + } + std::atomic pipelines{0}; +}; + +class TestAcceptor : public Acceptor { +EventBase base_; + public: + TestAcceptor() : Acceptor(ServerSocketConfig()) { + Acceptor::init(nullptr, &base_); + } + void onNewConnection(AsyncTransportWrapper::UniquePtr, + const folly::SocketAddress*, + const std::string& /* nextProtocolName */, + SecureTransportType, + const TransportInfo&) override {} +}; + +class TestAcceptorFactory : public AcceptorFactory { + public: + std::shared_ptr newAcceptor(EventBase*) override { + return std::make_shared(); + } +}; + +TEST(Bootstrap, Basic) { + TestServer server; + TestClient client; +} + +TEST(Bootstrap, ServerWithPipeline) { + TestServer server; + server.childPipeline(std::make_shared()); + server.bind(0); + server.stop(); +} + +TEST(Bootstrap, ServerWithChildHandler) { + TestServer server; + server.childHandler(std::make_shared()); + server.bind(0); + server.stop(); +} + +TEST(Bootstrap, ClientServerTest) { + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); + base->loop(); + server.stop(); + server.join(); + + EXPECT_EQ(factory->pipelines, 1); +} + +TEST(Bootstrap, ClientConnectionManagerTest) { + // Create a single IO thread, and verify that + // client connections are pooled properly + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(std::make_shared(1)); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.pipelineFactory(std::make_shared()); + + client.connect(address); + + TestClient client2; + client2.pipelineFactory(std::make_shared()); + client2.connect(address); + + base->loop(); + server.stop(); + server.join(); + + EXPECT_EQ(factory->pipelines, 2); +} + +TEST(Bootstrap, ServerAcceptGroupTest) { + // Verify that server is using the accept IO group + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(std::make_shared(1), nullptr); + server.bind(0); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + boost::barrier barrier(2); + auto thread = std::thread([&](){ + TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); + EventBaseManager::get()->getEventBase()->loop(); + barrier.wait(); + }); + barrier.wait(); + server.stop(); + thread.join(); + server.join(); + + EXPECT_EQ(factory->pipelines, 1); +} + +TEST(Bootstrap, ServerAcceptGroup2Test) { + // Verify that server is using the accept IO group + + // Check if reuse port is supported, if not, don't run this test + try { + EventBase base; + auto serverSocket = AsyncServerSocket::newSocket(&base); + serverSocket->bind(0); + serverSocket->listen(0); + serverSocket->startAccepting(); + serverSocket->setReusePortEnabled(true); + serverSocket->stopAccepting(); + } catch(...) { + LOG(INFO) << "Reuse port probably not supported"; + return; + } + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(std::make_shared(4), nullptr); + server.bind(0); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.pipelineFactory(std::make_shared()); + + client.connect(address); + EventBaseManager::get()->getEventBase()->loop(); + + server.stop(); + server.join(); + + EXPECT_EQ(factory->pipelines, 1); +} + +TEST(Bootstrap, SharedThreadPool) { + // Check if reuse port is supported, if not, don't run this test + try { + EventBase base; + auto serverSocket = AsyncServerSocket::newSocket(&base); + serverSocket->bind(0); + serverSocket->listen(0); + serverSocket->startAccepting(); + serverSocket->setReusePortEnabled(true); + serverSocket->stopAccepting(); + } catch(...) { + LOG(INFO) << "Reuse port probably not supported"; + return; + } + + auto pool = std::make_shared(2); + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + server.group(pool, pool); + + server.bind(0); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); + + TestClient client2; + client2.pipelineFactory(std::make_shared()); + client2.connect(address); + + TestClient client3; + client3.pipelineFactory(std::make_shared()); + client3.connect(address); + + TestClient client4; + client4.pipelineFactory(std::make_shared()); + client4.connect(address); + + TestClient client5; + client5.pipelineFactory(std::make_shared()); + client5.connect(address); + + EventBaseManager::get()->getEventBase()->loop(); + + server.stop(); + server.join(); + + EXPECT_EQ(factory->pipelines, 5); +} + +TEST(Bootstrap, ExistingSocket) { + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket); + server.bind(std::move(socket)); +} + +std::atomic connections{0}; + +class TestHandlerPipeline : public InboundHandler { + public: + void read(Context* ctx, AcceptPipelineType conn) override { + if (conn.type() == typeid(ConnEvent)) { + auto connEvent = boost::get(conn); + if (connEvent == ConnEvent::CONN_ADDED) { + connections++; + } + } + return ctx->fireRead(conn); + } +}; + +template +class TestHandlerPipelineFactory : public AcceptPipelineFactory { + public: + AcceptPipeline::Ptr newPipeline(Acceptor*) override { + auto pipeline = AcceptPipeline::create(); + pipeline->addBack(HandlerPipeline()); + return pipeline; + } +}; + +TEST(Bootstrap, LoadBalanceHandler) { + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + + auto pipelinefactory = + std::make_shared>(); + server.pipeline(pipelinefactory); + server.bind(0); + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); + base->loop(); + server.stop(); + server.join(); + + EXPECT_EQ(factory->pipelines, 1); + EXPECT_EQ(connections, 1); +} + +class TestUDPPipeline : public InboundHandler { + public: + void read(Context*, AcceptPipelineType) override { connections++; } +}; + +TEST(Bootstrap, UDP) { + TestServer server; + auto factory = std::make_shared(); + auto pipelinefactory = + std::make_shared>(); + server.pipeline(pipelinefactory); + server.channelFactory(std::make_shared()); + server.bind(0); +} + +TEST(Bootstrap, UDPClientServerTest) { + connections = 0; + + TestServer server; + auto factory = std::make_shared(); + auto pipelinefactory = + std::make_shared>(); + server.pipeline(pipelinefactory); + server.channelFactory(std::make_shared()); + server.bind(0); + + auto base = EventBaseManager::get()->getEventBase(); + + SocketAddress address; + server.getSockets()[0]->getAddress(&address); + + SocketAddress localhost("::1", 0); + AsyncUDPSocket client(base); + client.bind(localhost); + auto data = IOBuf::create(1); + data->append(1); + *(data->writableData()) = 'a'; + client.write(address, std::move(data)); + base->loop(); + server.stop(); + server.join(); + + EXPECT_EQ(connections, 1); +} + +TEST(Bootstrap, UnixServer) { + TestServer server; + auto factory = std::make_shared(); + + folly::test::TemporaryDirectory tmpdir("wangle-bootstrap-test"); + auto socketPath = (tmpdir.path() / "sock").string(); + + server.childPipeline(factory); + SocketAddress address; + address.setFromPath(socketPath); + server.bind(address); + auto base = EventBaseManager::get()->getEventBase(); + + TestClient client; + client.pipelineFactory(std::make_shared()); + auto pipelineFuture = client.connect(address); + base->loop(); + server.stop(); + server.join(); + + EXPECT_TRUE(pipelineFuture.get() != nullptr); + EXPECT_EQ(factory->pipelines, 1); +} + +TEST(Bootstrap, ServerBindFailure) { + // Bind to a TCP socket + EventBase base; + auto serverSocket = AsyncServerSocket::newSocket(&base); + serverSocket->bind(0); + serverSocket->listen(0); + + SocketAddress address; + serverSocket->getAddress(&address); + + // Now try starting a server using the address we are already listening on + // This should fail. + + TestServer server; + auto factory = std::make_shared(); + server.childPipeline(factory); + try { + server.bind(address); + FAIL() << "shouldn't be allowed to bind to an in-use address"; + } catch (const std::system_error& ex) { + EXPECT_EQ(EADDRINUSE, ex.code().value()) << "unexpected error code " << + ex.code().value() << ": " << ex.code().message(); + } +} diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index a0c7f96..8ddada9 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -27,6 +27,9 @@ #include "connection/pipeline.h" #include "connection/sasl-handler.h" #include "connection/service.h" +#include +#include +#include "exceptions/exception.h" using std::chrono::milliseconds; using std::chrono::nanoseconds; @@ -56,15 +59,30 @@ std::shared_ptr> ConnectionFactory::M std::shared_ptr ConnectionFactory::Connect( std::shared_ptr> client, const std::string &hostname, uint16_t port) { - // Yes this will block however it makes dealing with connection pool soooooo - // much nicer. - // TODO see about using shared promise for this. - auto pipeline = client - ->connect(folly::SocketAddress(hostname, port, true), - std::chrono::duration_cast(connect_timeout_)) - .get(); - auto dispatcher = std::make_shared(); - dispatcher->setPipeline(pipeline); - return dispatcher; + return AsyncConnect(client, hostname, port).get(); +} + +folly::Future> ConnectionFactory::AsyncConnect( + std::shared_ptr> client, const std::string &hostname, + uint16_t port) { + + folly::Promise> promise; + auto future = promise.getFuture(); + + try { + /* any connection error (e.g. timeout) will be folly::AsyncSocketException */ + auto pipeline = client->connect( + SocketAddress(hostname, port, true), + std::chrono::duration_cast(connect_timeout_)).get(); + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + promise.setValue(dispatcher); + } catch(const folly::AsyncSocketException &e) { + promise.setException( + folly::make_exception_wrapper( + folly::make_exception_wrapper(e))); + } + + return future; } } // namespace hbase diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index c96087d..65e9327 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include @@ -55,7 +56,7 @@ class ConnectionFactory { virtual std::shared_ptr> MakeBootstrap(); /** - * Connect a ClientBootstrap to a server and return the pipeline. + * Connect a ClientBootstrap to a server and return the wangle::Service. * * This is mostly visible so that mocks can override socket connections. */ @@ -63,6 +64,16 @@ class ConnectionFactory { std::shared_ptr> client, const std::string &hostname, uint16_t port); + /** + * Asynchronously Connect a ClientBootstrap to a server and return the wangle::Service. + * + * This async function makes it easy to propagate exceptions in a controlled way with + * help of folly::Future/Promise. + */ + virtual folly::Future> AsyncConnect( + std::shared_ptr> client, + const std::string &hostname, uint16_t port); + private: std::chrono::nanoseconds connect_timeout_; std::shared_ptr conf_; diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index e98759d..3663ec5 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -22,11 +22,15 @@ #include #include #include +#include #include #include +#include "exceptions/exception.h" using std::chrono::nanoseconds; +using namespace folly; +using namespace hbase; namespace hbase { @@ -45,24 +49,31 @@ ConnectionPool::~ConnectionPool() { Close(); } std::shared_ptr ConnectionPool::GetConnection( std::shared_ptr remote_id) { - // Try and get th cached connection. - auto found_ptr = GetCachedConnection(remote_id); + /** + * Try and get the cached connection, if there's no connection then create it. + */ + auto found_ptr = GetCachedConnection(remote_id); + return found_ptr == nullptr ? GetNewConnection(remote_id) : found_ptr; +} - // If there's no connection then create it. - if (found_ptr == nullptr) { - found_ptr = GetNewConnection(remote_id); - } - return found_ptr; + +folly::Future> ConnectionPool::AsyncGetConnection( + std::shared_ptr remote_id) { + /** + * Try and get the cached connection, if there's no connection then create it. + */ + auto found_ptr = GetCachedConnection(remote_id); + return + found_ptr == nullptr ? + AsyncGetNewConnection(remote_id) + : folly::makeFuture>(std::move(found_ptr)); } std::shared_ptr ConnectionPool::GetCachedConnection( std::shared_ptr remote_id) { folly::SharedMutexWritePriority::ReadHolder holder(map_mutex_); auto found = connections_.find(remote_id); - if (found == connections_.end()) { - return nullptr; - } - return found->second; + return found == connections_.end() ? nullptr : found->second; } std::shared_ptr ConnectionPool::GetNewConnection( @@ -91,11 +102,51 @@ std::shared_ptr ConnectionPool::GetNewConnection( connections_.insert(std::make_pair(remote_id, connection)); clients_.insert(std::make_pair(remote_id, clientBootstrap)); - return connection; } } +folly::Future> ConnectionPool::AsyncGetNewConnection( + std::shared_ptr remote_id) { + // Grab the upgrade lock. While we are double checking other readers can + // continue on + SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; + + folly::Promise> promise; + auto future = promise.getFuture(); + + // Now check if someone else created the connection before we got the lock + // This is safe since we hold the upgrade lock. + // upgrade lock is more power than the reader lock. + auto found = connections_.find(remote_id); + if (found != connections_.end() && found->second != nullptr) { + promise.setValue(found->second); + } else { + // Yeah it looks a lot like there's no connection + SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; + + // Make double sure there are not stale connections hanging around. + connections_.erase(remote_id); + + /* create new connection */ + auto clientBootstrap = cf_->MakeBootstrap(); + try { + auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); + auto connection = std::make_shared(remote_id, dispatcher); + promise.setValue(connection); + + connections_.insert(std::make_pair(remote_id, connection)); + clients_.insert(std::make_pair(remote_id, clientBootstrap)); + } catch(const hbase::ConnectionException &e) { + /* propagating ConnectionException up */ + promise.setException( + folly::make_exception_wrapper(e)); + } + } + + return future; +} + void ConnectionPool::Close(std::shared_ptr remote_id) { folly::SharedMutexWritePriority::WriteHolder holder{map_mutex_}; DLOG(INFO) << "Closing RPC Connection to host:" << remote_id->host() diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index c7c4246..e6f5c22 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include #include @@ -66,6 +67,11 @@ class ConnectionPool { std::shared_ptr GetConnection(std::shared_ptr remote_id); /** + * Asynchronously get connection by ConnectionId. + */ + folly::Future> AsyncGetConnection(std::shared_ptr remote_id); + + /** * Close/remove a connection. */ void Close(std::shared_ptr remote_id); @@ -78,6 +84,7 @@ class ConnectionPool { private: std::shared_ptr GetCachedConnection(std::shared_ptr remote_id); std::shared_ptr GetNewConnection(std::shared_ptr remote_id); + folly::Future> AsyncGetNewConnection(std::shared_ptr remote_id); std::unordered_map, std::shared_ptr, ConnectionIdHash, ConnectionIdEquals> connections_; diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 10faa7a..43c363e 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -22,6 +22,7 @@ #include #include #include +#include "exceptions/exception.h" using hbase::security::User; using std::chrono::nanoseconds; @@ -55,7 +56,7 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::unique_ptr req, std::shared_ptr ticket) { auto remote_id = std::make_shared(host, port, ticket); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return CallForResult(remote_id, std::move(req)); } folly::Future> RpcClient::AsyncCall(const std::string& host, @@ -64,10 +65,18 @@ folly::Future> RpcClient::AsyncCall(const std::string& std::shared_ptr ticket, const std::string& service_name) { auto remote_id = std::make_shared(host, port, ticket, service_name); - return GetConnection(remote_id)->SendRequest(std::move(req)); + return CallForResult(remote_id, std::move(req)); } -std::shared_ptr RpcClient::GetConnection(std::shared_ptr remote_id) { - return cp_->GetConnection(remote_id); +folly::Future> RpcClient::CallForResult( + std::shared_ptr remote_id, + std::unique_ptr req) { + try { + auto connection = cp_->AsyncGetConnection(remote_id).get(); + return connection->SendRequest(std::move(req)); + } catch (const hbase::ConnectionException &e) { + return folly::makeFuture>( + folly::make_exception_wrapper(e)); + } } } // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index 0ecde5b..a615f52 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -63,8 +63,9 @@ class RpcClient { std::shared_ptr connection_pool() const { return cp_; } - private: - std::shared_ptr GetConnection(std::shared_ptr remote_id); +private: + folly::Future> CallForResult(std::shared_ptr remote_id, + std::unique_ptr req); private: std::shared_ptr cp_; diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc new file mode 100644 index 0000000..81972e5 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "connection/rpc-test-server.h" diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h new file mode 100644 index 0000000..8624df7 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "wangle/bootstrap/ServerBootstrap.h" + +#include "connection/pipeline.h" + +namespace hbase { + +typedef ServerBootstrap RpcTestServer; + +class RpcTestServerPipelineFactory: public PipelineFactory { + public: + SerializePipeline::Ptr newPipeline( + std::shared_ptr) override { + pipelines++; + auto pipeline = SerializePipeline::create(); + pipeline->addBack(new BytesToBytesHandler()); + pipeline->finalize(); + return pipeline; + } + + int pipelines() { + return pipelines_; + } + +private: + std::atomic pipelines_{0}; +}; +} // namespace hbase diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc new file mode 100644 index 0000000..718ed2c --- /dev/null +++ b/hbase-native-client/connection/rpc-test.cc @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "wangle/bootstrap/ClientBootstrap.h" +#include "wangle/channel/Handler.h" + +#include +#include +#include +#include +#include + +#include "connection/rpc-test-server.h" + +using namespace wangle; +using namespace folly; + +namespace hbase { + +TEST(RPC, ServerWithPipeline) { + RpcTestServer server; + server.childPipeline(std::make_shared()); + server.bind(0); + server.stop(); +} +} // namespace hbase diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h index bdedff4..d628078 100644 --- a/hbase-native-client/exceptions/exception.h +++ b/hbase-native-client/exceptions/exception.h @@ -59,7 +59,7 @@ class IOException : public std::logic_error { IOException(const std::string& what, bool do_not_retry) : logic_error(what), do_not_retry_(do_not_retry) {} - IOException(const std::string& what, folly::exception_wrapper cause) + IOException(const std::string& what, const folly::exception_wrapper &cause) : logic_error(what), cause_(cause), do_not_retry_(false) {} IOException(const std::string& what, folly::exception_wrapper cause, bool do_not_retry) @@ -115,6 +115,25 @@ class RetriesExhaustedException : public IOException { int32_t num_retries_; }; +class ConnectionException : public IOException { +public: + ConnectionException() { + } + + ConnectionException(const std::string& what) : + IOException(what) { + } + + ConnectionException(const folly::exception_wrapper &cause) : + IOException("", cause) { + } + + ConnectionException( + const std::string& what, + const folly::exception_wrapper &cause) : IOException(what, cause) { + } +}; + class RemoteException : public IOException { public: RemoteException() : IOException(), port_(0) {} -- 2.10.1 (Apple Git-78)