From 9c1b1064ee6ad47a1d05afff6d806167d968c5ba Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Mon, 10 Jul 2017 15:05:14 -0700 Subject: [PATCH] HBASE-18338. [C++] Implement RpcTestServer --- hbase-native-client/connection/BUCK | 10 +++ .../connection/hbase-ClientSerializeHandler.h | 46 ++++++++++ hbase-native-client/connection/hbase-RpcClient.h | 97 ++++++++++++++++++++++ hbase-native-client/connection/hbase-RpcServer.h | 89 ++++++++++++++++++++ .../connection/hbase-ServerSerializeHandler.h | 46 ++++++++++ hbase-native-client/connection/hbase-bridge.h | 29 +++++++ hbase-native-client/connection/hbase-rpc-test.cc | 74 +++++++++++++++++ 7 files changed, 391 insertions(+) create mode 100644 hbase-native-client/connection/hbase-ClientSerializeHandler.h create mode 100644 hbase-native-client/connection/hbase-RpcClient.h create mode 100644 hbase-native-client/connection/hbase-RpcServer.h create mode 100644 hbase-native-client/connection/hbase-ServerSerializeHandler.h create mode 100644 hbase-native-client/connection/hbase-bridge.h create mode 100644 hbase-native-client/connection/hbase-rpc-test.cc diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb..7f1a2e4 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -33,6 +33,11 @@ cxx_library( "service.h", "rpc-client.h", "sasl-util.h", + "hbase-RpcClient.h", + "hbase-RpcServer.h", + "hbase-ClientSerializeHandler.h", + "hbase-ServerSerializeHandler.h", + "hbase-bridge.h", ], srcs=[ "client-dispatcher.cc", @@ -68,3 +73,8 @@ cxx_test( deps=[ ":connection", ],) +cxx_test( + name="hbase-rpc-test", + srcs=["hbase-rpc-test.cc",], + deps=[":connection",], + run_test_separately=True,) \ No newline at end of file diff --git a/hbase-native-client/connection/hbase-ClientSerializeHandler.h b/hbase-native-client/connection/hbase-ClientSerializeHandler.h new file mode 100644 index 0000000..b23f5e3 --- /dev/null +++ b/hbase-native-client/connection/hbase-ClientSerializeHandler.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +#include "hbase-bridge.h" + +// Do some serialization / deserialization using thrift. +// A real rpc server would probably use generated client/server stubs +class ClientSerializeHandler : public wangle::Handler< + std::unique_ptr, thrift::test::Xtruct, + thrift::test::Bonk, std::unique_ptr> { + public: + void read(Context* ctx, std::unique_ptr msg) override { + thrift::test::Xtruct received; + ser.deserialize(msg->moveToFbString(), &received); + ctx->fireRead(received); + } + + folly::Future write(Context* ctx, thrift::test::Bonk b) + override { + std::string out; + ser.serialize(b, &out); + return ctx->fireWrite(folly::IOBuf::copyBuffer(out)); + } + + private: + apache::thrift::util::ThriftSerializerCompact<> ser; +}; diff --git a/hbase-native-client/connection/hbase-RpcClient.h b/hbase-native-client/connection/hbase-RpcClient.h new file mode 100644 index 0000000..e3b7fbe --- /dev/null +++ b/hbase-native-client/connection/hbase-RpcClient.h @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hbase-ClientSerializeHandler.h" + +using namespace folly; +using namespace wangle; +using thrift::test::Bonk; +using thrift::test::Xtruct; + +using SerializePipeline = wangle::Pipeline; + +DEFINE_int32(port, 8080, "test server port"); +DEFINE_string(host, "::1", "test server address"); + +class RpcPipelineFactory : public PipelineFactory { + public: + SerializePipeline::Ptr newPipeline( + std::shared_ptr sock) override { + auto pipeline = SerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(LengthFieldPrepender()); + pipeline->addBack(ClientSerializeHandler()); + pipeline->finalize(); + + return pipeline; + } +}; + +// Client multiplex dispatcher. Uses Bonk.type as request ID +class BonkMultiplexClientDispatcher + : public ClientDispatcherBase { + public: + void read(Context*, Xtruct in) override { + auto search = requests_.find(in.i32_thing); + CHECK(search != requests_.end()); + auto p = std::move(search->second); + requests_.erase(in.i32_thing); + p.setValue(in); + } + + Future operator()(Bonk arg) override { + auto& p = requests_[arg.type]; + auto f = p.getFuture(); + p.setInterruptHandler([arg, this](const folly::exception_wrapper&) { + this->requests_.erase(arg.type); + }); + this->pipeline_->write(arg); + + return f; + } + + // Print some nice messages for close + + Future close() override { + printf("Channel closed\n"); + return ClientDispatcherBase::close(); + } + + Future close(Context* ctx) override { + printf("Channel closed\n"); + return ClientDispatcherBase::close(ctx); + } + + private: + std::unordered_map> requests_; +}; diff --git a/hbase-native-client/connection/hbase-RpcServer.h b/hbase-native-client/connection/hbase-RpcServer.h new file mode 100644 index 0000000..d0d6865 --- /dev/null +++ b/hbase-native-client/connection/hbase-RpcServer.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hbase-ServerSerializeHandler.h" + +using namespace folly; +using namespace wangle; + +using thrift::test::Bonk; +using thrift::test::Xtruct; + +using SerializePipeline = wangle::Pipeline; + +DEFINE_int32(port, 8080, "test server port"); + +class RpcService : public Service { + public: + Future operator()(Bonk request) override { + // Oh no, we got Bonked! Quick, Bonk back + printf("Bonk: %s, %i\n", request.message.c_str(), request.type); + + /* sleep override: ignore lint + * useful for testing dispatcher behavior by hand + */ + // Wait for a bit + return futures::sleep(std::chrono::seconds(request.type)) + .then([request]() { + Xtruct response; + response.string_thing = "Stop saying " + request.message + "!"; + response.i32_thing = request.type; + return response; + }); + } +}; + +class RpcPipelineFactory : public PipelineFactory { + public: + SerializePipeline::Ptr newPipeline( + std::shared_ptr sock) override { + auto pipeline = SerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(LengthFieldPrepender()); + pipeline->addBack(ServerSerializeHandler()); + // We could use a serial dispatcher instead easily + // pipeline->addBack(SerialServerDispatcher(&service_)); + // Or a Pipelined Dispatcher + // pipeline->addBack(PipelinedServerDispatcher(&service_)); + pipeline->addBack(MultiplexServerDispatcher(&service_)); + pipeline->finalize(); + + return pipeline; + } + + private: + ExecutorFilter service_{ + std::make_shared(10), + std::make_shared()}; +}; diff --git a/hbase-native-client/connection/hbase-ServerSerializeHandler.h b/hbase-native-client/connection/hbase-ServerSerializeHandler.h new file mode 100644 index 0000000..ec311ff --- /dev/null +++ b/hbase-native-client/connection/hbase-ServerSerializeHandler.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +#include "hbase-bridge.h" + +// Do some serialization / deserialization using thrift. +// A real rpc server would probably use generated client/server stubs +class ServerSerializeHandler : public wangle::Handler< + std::unique_ptr, thrift::test::Bonk, + thrift::test::Xtruct, std::unique_ptr> { + public: + void read(Context* ctx, std::unique_ptr msg) override { + thrift::test::Bonk received; + ser.deserialize(msg->moveToFbString(), &received); + ctx->fireRead(received); + } + + folly::Future write(Context* ctx, thrift::test::Xtruct b) + override { + std::string out; + ser.serialize(b, &out); + return ctx->fireWrite(folly::IOBuf::copyBuffer(out)); + } + + private: + apache::thrift::util::ThriftSerializerCompact<> ser; +}; diff --git a/hbase-native-client/connection/hbase-bridge.h b/hbase-native-client/connection/hbase-bridge.h new file mode 100644 index 0000000..bb19314 --- /dev/null +++ b/hbase-native-client/connection/hbase-bridge.h @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include "request.h" +#include "response.h" + +namespace thrift { +namespace test { + typedef hbase::Request Bonk; + typedef hbase::Response Xtruct; +} +} diff --git a/hbase-native-client/connection/hbase-rpc-test.cc b/hbase-native-client/connection/hbase-rpc-test.cc new file mode 100644 index 0000000..d0dc4a7 --- /dev/null +++ b/hbase-native-client/connection/hbase-rpc-test.cc @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "wangle/bootstrap/ClientBootstrap.h" +#include "wangle/channel/Handler.h" + +#include +#include +#include +#include +#include + +#include "hbase-RpcClient.h" +#include "hbase-RpcServer.h" + +using namespace wangle; +using namespace folly; + +TEST(WANGLE, RPCSERVER) { + + ServerBootstrap server; + server.childPipeline(std::make_shared()); + server.bind(FLAGS_port); + server.waitForStop(); + + /** + * For specific protocols, all the following code would be wrapped + * in a protocol-specific ServiceFactories. + * + * TODO: examples of ServiceFactoryFilters, for connection pooling, etc. + */ + ClientBootstrap client; + client.group(std::make_shared < wangle::IOThreadPoolExecutor > (1)); + client.pipelineFactory(std::make_shared()); + auto pipeline = client.connect(SocketAddress(FLAGS_host, FLAGS_port)).get(); + // A serial dispatcher would assert if we tried to send more than one + // request at a time + // SerialClientDispatcher service; + // Or we could use a pipelined dispatcher, but responses would always come + // back in order + // PipelinedClientDispatcher service; + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + + // Set an idle timeout of 5s using a filter. + ExpiringFilter < Bonk, Xtruct > service(dispatcher, std::chrono::seconds(5)); + try { + Bonk request; + request.message = "hello, hbase server"; + request.type = 123; + service(request).then([request](Xtruct response) { + ASSERT_TRUE(request.message == response.message); + std::cout << response.string_thing << std::endl; + }); + } catch (const std::exception& e) { + std::cout << exceptionStr(e) << std::endl; + } +} -- 2.10.1 (Apple Git-78)