From 8d3f5fbf997729710dca02052ad3b218cd09490d Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Mon, 17 Jul 2017 15:07:52 -0700 Subject: [PATCH] HBASE-18338. [C++] Implement RpcTestServer --- hbase-native-client/connection/BUCK | 13 ++++ hbase-native-client/connection/client-handler.cc | 21 ++++-- hbase-native-client/connection/client-handler.h | 7 +- hbase-native-client/connection/connection-id.h | 8 +- hbase-native-client/connection/pipeline.cc | 12 ++- .../connection/rpc-test-server-handler.cc | 77 +++++++++++++++++++ .../connection/rpc-test-server-handler.h | 47 ++++++++++++ hbase-native-client/connection/rpc-test-server.cc | 70 ++++++++++++++++++ hbase-native-client/connection/rpc-test-server.h | 50 +++++++++++++ hbase-native-client/connection/rpc-test.cc | 86 ++++++++++++++++++++++ hbase-native-client/connection/sasl-handler.h | 2 +- hbase-native-client/if/test.proto | 43 +++++++++++ hbase-native-client/if/test_rpc_service.proto | 35 +++++++++ hbase-native-client/serde/BUCK | 4 +- .../serde/client-deserializer-test.cc | 3 +- .../serde/client-serializer-test.cc | 2 +- hbase-native-client/serde/{rpc.cc => rpc-serde.cc} | 16 +++- hbase-native-client/serde/{rpc.h => rpc-serde.h} | 16 ++++ 18 files changed, 487 insertions(+), 25 deletions(-) create mode 100644 hbase-native-client/connection/rpc-test-server-handler.cc create mode 100644 hbase-native-client/connection/rpc-test-server-handler.h create mode 100644 hbase-native-client/connection/rpc-test-server.cc create mode 100644 hbase-native-client/connection/rpc-test-server.h create mode 100644 hbase-native-client/connection/rpc-test.cc create mode 100644 hbase-native-client/if/test.proto create mode 100644 hbase-native-client/if/test_rpc_service.proto rename hbase-native-client/serde/{rpc.cc => rpc-serde.cc} (94%) rename hbase-native-client/serde/{rpc.h => rpc-serde.h} (87%) diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb5f7..aaf8fdbe35 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -33,6 +33,8 @@ cxx_library( "service.h", "rpc-client.h", "sasl-util.h", + "rpc-test-server.h", + "rpc-test-server-handler.h", ], srcs=[ "client-dispatcher.cc", @@ -44,6 +46,8 @@ cxx_library( "rpc-client.cc", "sasl-handler.cc", "sasl-util.cc", + "rpc-test-server.cc", + "rpc-test-server-handler.cc", ], deps=[ "//if:if", @@ -68,3 +72,12 @@ cxx_test( deps=[ ":connection", ],) +cxx_test( + name="rpc-test", + srcs=[ + "rpc-test.cc", + ], + deps=[ + ":connection", + ], + run_test_separately=True,) diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 052c17184c..39227d3f64 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -35,9 +35,10 @@ using google::protobuf::Message; namespace hbase { ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec, - const std::string &server) + std::shared_ptr conf, const std::string &server) : user_name_(user_name), serde_(codec), + conf_(conf), server_(server), once_flag_(std::make_unique()), resp_msgs_( @@ -115,13 +116,17 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { } folly::Future ClientHandler::write(Context *ctx, std::unique_ptr r) { - // We need to send the header once. - // So use call_once to make sure that only one thread wins this. - std::call_once((*once_flag_), [ctx, this]() { - VLOG(3) << "Writing RPC Header to server: " << server_; - auto header = serde_.Header(user_name_); - ctx->fireWrite(std::move(header)); - }); + /* for RPC test, there's no need to send connection header */ + if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, + RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) { + // We need to send the header once. + // So use call_once to make sure that only one thread wins this. + std::call_once((*once_flag_), [ctx, this]() { + VLOG(3) << "Writing RPC Header to server: " << server_; + auto header = serde_.Header(user_name_); + ctx->fireWrite(std::move(header)); + }); + } VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_; diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 8de3a8bc52..b6f19a2617 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -26,9 +26,10 @@ #include #include +#include "core/configuration.h" #include "exceptions/exception.h" #include "serde/codec.h" -#include "serde/rpc.h" +#include "serde/rpc-serde.h" #include "utils/concurrent-map.h" // Forward decs. @@ -60,7 +61,8 @@ class ClientHandler * Create the handler * @param user_name the user name of the user running this process. */ - ClientHandler(std::string user_name, std::shared_ptr codec, const std::string &server); + ClientHandler(std::string user_name, std::shared_ptr codec, + std::shared_ptr conf, const std::string &server); /** * Get bytes from the wire. @@ -79,6 +81,7 @@ class ClientHandler std::string user_name_; RpcSerde serde_; std::string server_; // for logging + std::shared_ptr conf_; // in flight requests std::unique_ptr>> resp_msgs_; diff --git a/hbase-native-client/connection/connection-id.h b/hbase-native-client/connection/connection-id.h index 059469f843..4f84bf8ae2 100644 --- a/hbase-native-client/connection/connection-id.h +++ b/hbase-native-client/connection/connection-id.h @@ -39,9 +39,11 @@ class ConnectionId { const std::string &service_name) : user_(user), service_name_(service_name), host_(host), port_(port) {} - ConnectionId(const std::string &host, uint16_t port, - const std::string &service_name) - : user_(security::User::defaultUser()), service_name_(service_name), host_(host), port_(port) {} + ConnectionId(const std::string &host, uint16_t port, const std::string &service_name) + : user_(security::User::defaultUser()), + service_name_(service_name), + host_(host), + port_(port) {} virtual ~ConnectionId() = default; diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc index 284475287b..9c790b6c3f 100644 --- a/hbase-native-client/connection/pipeline.cc +++ b/hbase-native-client/connection/pipeline.cc @@ -32,7 +32,6 @@ namespace hbase { RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr codec, std::shared_ptr conf) : user_util_(), codec_(codec), conf_(conf) {} - SerializePipeline::Ptr RpcPipelineFactory::newPipeline( std::shared_ptr sock) { folly::SocketAddress addr; // for logging @@ -41,10 +40,15 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline( auto pipeline = SerializePipeline::create(); pipeline->addBack(wangle::AsyncSocketHandler{sock}); pipeline->addBack(wangle::EventBaseHandler{}); - auto secure = security::User::IsSecurityEnabled(*conf_); - pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_}); + bool secure = false; + /* for RPC test, there's no need to setup Sasl */ + if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, + RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) { + secure = security::User::IsSecurityEnabled(*conf_); + pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_}); + } pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{}); - pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, addr.describe()}); + pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()}); pipeline->finalize(); return pipeline; } diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc new file mode 100644 index 0000000000..7d2f407d55 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server-handler.cc @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/rpc-test-server-handler.h" +#include "if/RPC.pb.h" +#include "if/test.pb.h" + +namespace hbase { + +void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr buf) { + buf->coalesce(); + pb::RequestHeader header; + + int used_bytes = serde_.ParseDelimited(buf.get(), &header); + VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id(); + + auto received = CreateReceivedRequest(header.method_name()); + + buf->trimStart(used_bytes); + if (header.has_request_param() && received != nullptr) { + used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get()); + VLOG(3) << "Read RPCRequest, buf length:" << buf->length() + << ", header PB length:" << used_bytes; + received->set_call_id(header.call_id()); + } + + if (received != nullptr) { + ctx->fireRead(std::move(received)); + } +} + +folly::Future RpcTestServerSerializeHandler::write(Context* ctx, + std::unique_ptr r) { + VLOG(3) << "Writing RPC Request"; + // Send the data down the pipeline. + return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get())); +} + +std::unique_ptr RpcTestServerSerializeHandler::CreateReceivedRequest( + const std::string& method_name) { + std::unique_ptr result = nullptr; + ; + if (method_name == "ping") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); + } else if (method_name == "echo") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); + } else if (method_name == "error") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); + } else if (method_name == "pause") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); + } else if (method_name == "addr") { + result = std::make_unique(std::make_shared(), + std::make_shared(), method_name); + } + return result; +} +} // end of namespace hbase diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h new file mode 100644 index 0000000000..4c84615baa --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server-handler.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +#include "connection/request.h" +#include "connection/response.h" +#include "serde/rpc-serde.h" + +using namespace hbase; + +namespace hbase { +// A real rpc server would probably use generated client/server stubs +class RpcTestServerSerializeHandler + : public wangle::Handler, std::unique_ptr, + std::unique_ptr, std::unique_ptr> { + public: + RpcTestServerSerializeHandler() : serde_() {} + + void read(Context* ctx, std::unique_ptr buf) override; + + folly::Future write(Context* ctx, std::unique_ptr r) override; + + private: + std::unique_ptr CreateReceivedRequest(const std::string& method_name); + + private: + hbase::RpcSerde serde_; +}; +} // end of namespace hbase diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc new file mode 100644 index 0000000000..d3a30b104c --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include +#include +#include +#include + +#include "connection/rpc-test-server-handler.h" +#include "connection/rpc-test-server.h" +#include "if/test.pb.h" + +namespace hbase { + +RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline( + std::shared_ptr sock) { + auto pipeline = RpcTestServerSerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(RpcTestServerSerializeHandler()); + pipeline->addBack( + MultiplexServerDispatcher, std::unique_ptr>(&service_)); + pipeline->finalize(); + + return pipeline; +} + +Future> RpcTestService::operator()(std::unique_ptr request) { + /* build Response */ + auto response = std::make_unique(); + response->set_call_id(request->call_id()); + std::string method_name = request->method(); + + if (method_name == "ping") { + auto pb_resp_msg = std::make_shared(); + response->set_resp_msg(pb_resp_msg); + } else if (method_name == "echo") { + auto pb_resp_msg = std::make_shared(); + auto pb_req_msg = std::static_pointer_cast(request->req_msg()); + pb_resp_msg->set_message(pb_req_msg->message()); + response->set_resp_msg(pb_resp_msg); + } else if (method_name == "error") { + // TODO: + } else if (method_name == "pause") { + // TODO: + } else if (method_name == "addr") { + // TODO: + } + + return folly::makeFuture>(std::move(response)); +} +} // namespace hbase diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h new file mode 100644 index 0000000000..c3225ff573 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once +#include +#include +#include + +#include "connection/request.h" +#include "connection/response.h" + +using namespace hbase; +using namespace folly; +using namespace wangle; + +namespace hbase { +using RpcTestServerSerializePipeline = wangle::Pipeline>; + +class RpcTestService : public Service, std::unique_ptr> { + public: + RpcTestService() {} + virtual ~RpcTestService() = default; + Future> operator()(std::unique_ptr request) override; +}; + +class RpcTestServerPipelineFactory : public PipelineFactory { + public: + RpcTestServerSerializePipeline::Ptr newPipeline( + std::shared_ptr sock) override; + + private: + ExecutorFilter, std::unique_ptr> service_{ + std::make_shared(1), std::make_shared()}; +}; +} // end of namespace hbase diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc new file mode 100644 index 0000000000..d4cd89f0a4 --- /dev/null +++ b/hbase-native-client/connection/rpc-test.cc @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "if/test.pb.h" +#include "rpc-test-server.h" +#include "security/user.h" +#include "serde/rpc-serde.h" + +using namespace wangle; +using namespace folly; +using namespace hbase; + +DEFINE_int32(port, 0, "test server port"); + +TEST(RpcTestServer, echo) { + /* create conf */ + auto conf = std::make_shared(); + conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true"); + + /* create rpc test server */ + auto server = std::make_shared>(); + server->childPipeline(std::make_shared()); + server->bind(FLAGS_port); + folly::SocketAddress server_addr; + server->getSockets()[0]->getAddress(&server_addr); + + /* create RpcClient */ + auto io_executor = std::make_shared(1); + + auto rpc_client = std::make_shared(io_executor, nullptr, conf); + + /** + * test echo + */ + try { + std::string greetings = "hello, hbase server!"; + auto request = std::make_unique(std::make_shared(), + std::make_shared(), "echo"); + auto pb_msg = std::static_pointer_cast(request->req_msg()); + pb_msg->set_message(greetings); + + /* sending out request */ + rpc_client + ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([=](std::unique_ptr response) { + auto pb_resp = std::static_pointer_cast(response->resp_msg()); + VLOG(1) << "message returned: " + pb_resp->message(); + EXPECT_EQ(greetings, pb_resp->message()); + }); + } catch (const std::exception& e) { + throw e; + } + + server->stop(); + server->join(); +} diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h index f606a23ccc..81f4e814e3 100644 --- a/hbase-native-client/connection/sasl-handler.h +++ b/hbase-native-client/connection/sasl-handler.h @@ -30,7 +30,7 @@ #include "connection/sasl-util.h" #include "connection/service.h" #include "security/user.h" -#include "serde/rpc.h" +#include "serde/rpc-serde.h" namespace hbase { diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto new file mode 100644 index 0000000000..72b68e9aae --- /dev/null +++ b/hbase-native-client/if/test.proto @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; +option java_outer_classname = "TestProtos"; +option java_generate_equals_and_hash = true; + +message EmptyRequestProto { +} + +message EmptyResponseProto { +} + +message EchoRequestProto { + required string message = 1; +} + +message EchoResponseProto { + required string message = 1; +} + +message PauseRequestProto { + required uint32 ms = 1; +} + +message AddrResponseProto { + required string addr = 1; +} diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto new file mode 100644 index 0000000000..5f91dc4df4 --- /dev/null +++ b/hbase-native-client/if/test_rpc_service.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; +option java_outer_classname = "TestRpcServiceProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "test.proto"; + + +/** + * A protobuf service for use in tests + */ +service TestProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(EchoRequestProto) returns (EchoResponseProto); + rpc error(EmptyRequestProto) returns (EmptyResponseProto); + rpc pause(PauseRequestProto) returns (EmptyResponseProto); + rpc addr(EmptyRequestProto) returns (AddrResponseProto); +} diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK index 18e949c7cb..a765884af1 100644 --- a/hbase-native-client/serde/BUCK +++ b/hbase-native-client/serde/BUCK @@ -22,13 +22,13 @@ cxx_library( "cell-outputstream.h", "codec.h", "region-info.h", - "rpc.h", + "rpc-serde.h", "server-name.h", "table-name.h", "zk.h", ], srcs=[ - "rpc.cc", + "rpc-serde.cc", "zk.cc", ], deps=[ diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc index 054684d2f2..1856047fbd 100644 --- a/hbase-native-client/serde/client-deserializer-test.cc +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -16,12 +16,11 @@ * limitations under the License. * */ -#include "serde/rpc.h" - #include #include #include "if/Client.pb.h" +#include "rpc-serde.h" using namespace hbase; using folly::IOBuf; diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc index 7d8b29ca1a..306f2c27f6 100644 --- a/hbase-native-client/serde/client-serializer-test.cc +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -24,7 +24,7 @@ #include "if/HBase.pb.h" #include "if/RPC.pb.h" -#include "serde/rpc.h" +#include "rpc-serde.h" using namespace hbase; using namespace hbase::pb; diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc-serde.cc similarity index 94% rename from hbase-native-client/serde/rpc.cc rename to hbase-native-client/serde/rpc-serde.cc index 957a317378..9e1f79a34c 100644 --- a/hbase-native-client/serde/rpc.cc +++ b/hbase-native-client/serde/rpc-serde.cc @@ -17,8 +17,6 @@ * */ -#include "serde/rpc.h" - #include #include #include @@ -30,6 +28,7 @@ #include #include "if/RPC.pb.h" +#include "rpc-serde.h" #include "utils/version.h" using folly::IOBuf; @@ -83,6 +82,8 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { return coded_stream.CurrentPosition(); } +RpcSerde::RpcSerde() {} + RpcSerde::RpcSerde(std::shared_ptr codec) : codec_(codec) {} std::unique_ptr RpcSerde::Preamble(bool secure) { @@ -162,6 +163,17 @@ std::unique_ptr RpcSerde::Request(const uint32_t call_id, const std::stri return PrependLength(std::move(ser_header)); } +std::unique_ptr RpcSerde::Response(const uint32_t call_id, + const google::protobuf::Message *msg) { + pb::ResponseHeader rh; + rh.set_call_id(call_id); + auto ser_header = SerializeDelimited(rh); + auto ser_resp = SerializeDelimited(*msg); + ser_header->appendChain(std::move(ser_resp)); + + return PrependLength(std::move(ser_header)); +} + std::unique_ptr RpcSerde::CreateCellScanner(std::unique_ptr buf, uint32_t offset, uint32_t length) { if (codec_ == nullptr) { diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc-serde.h similarity index 87% rename from hbase-native-client/serde/rpc.h rename to hbase-native-client/serde/rpc-serde.h index 15aa1eed37..0e1d44e8e3 100644 --- a/hbase-native-client/serde/rpc.h +++ b/hbase-native-client/serde/rpc-serde.h @@ -45,6 +45,7 @@ namespace hbase { */ class RpcSerde { public: + RpcSerde(); /** * Constructor assumes the default auth type. */ @@ -98,6 +99,17 @@ class RpcSerde { const google::protobuf::Message *msg); /** + * Serialize a response message into a protobuf. + * Request consists of: + * + * - Big endian length + * - ResponseHeader object + * - The passed in Message object + */ + std::unique_ptr Response(const uint32_t call_id, + const google::protobuf::Message *msg); + + /** * Serialize a message in the delimited format. * Delimited format consists of the following: * @@ -117,6 +129,10 @@ class RpcSerde { */ std::unique_ptr PrependLength(std::unique_ptr msg); + public: + static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode"; + static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false; + private: /* data */ std::shared_ptr codec_; -- 2.11.0 (Apple Git-81)