From 593f7c9ff44fd8c244ee66922c1f91e1a24e3a5c Mon Sep 17 00:00:00 2001 From: Xiaobing Zhou Date: Mon, 10 Jul 2017 15:05:14 -0700 Subject: [PATCH] HBASE-18338. [C++] Implement RpcTestServer --- hbase-native-client/connection/BUCK | 10 ++ hbase-native-client/connection/client-handler.h | 2 +- .../connection/hbase-ClientSerializeHandler.h | 120 +++++++++++++++++++++ hbase-native-client/connection/hbase-RpcClient.h | 118 ++++++++++++++++++++ hbase-native-client/connection/hbase-RpcServer.h | 80 ++++++++++++++ .../connection/hbase-ServerSerializeHandler.h | 95 ++++++++++++++++ hbase-native-client/connection/hbase-bridge.h | 29 +++++ hbase-native-client/connection/hbase-rpc-test.cc | 87 +++++++++++++++ hbase-native-client/connection/sasl-handler.h | 2 +- hbase-native-client/if/test.proto | 43 ++++++++ hbase-native-client/if/test_rpc_service.proto | 36 +++++++ hbase-native-client/serde/BUCK | 4 +- .../serde/client-deserializer-test.cc | 3 +- .../serde/client-serializer-test.cc | 2 +- hbase-native-client/serde/{rpc.cc => rpc-serde.cc} | 16 ++- hbase-native-client/serde/{rpc.h => rpc-serde.h} | 12 +++ 16 files changed, 650 insertions(+), 9 deletions(-) create mode 100644 hbase-native-client/connection/hbase-ClientSerializeHandler.h create mode 100644 hbase-native-client/connection/hbase-RpcClient.h create mode 100644 hbase-native-client/connection/hbase-RpcServer.h create mode 100644 hbase-native-client/connection/hbase-ServerSerializeHandler.h create mode 100644 hbase-native-client/connection/hbase-bridge.h create mode 100644 hbase-native-client/connection/hbase-rpc-test.cc create mode 100644 hbase-native-client/if/test.proto create mode 100644 hbase-native-client/if/test_rpc_service.proto rename hbase-native-client/serde/{rpc.cc => rpc-serde.cc} (94%) rename hbase-native-client/serde/{rpc.h => rpc-serde.h} (91%) diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb5f7..7f1a2e4125 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -33,6 +33,11 @@ cxx_library( "service.h", "rpc-client.h", "sasl-util.h", + "hbase-RpcClient.h", + "hbase-RpcServer.h", + "hbase-ClientSerializeHandler.h", + "hbase-ServerSerializeHandler.h", + "hbase-bridge.h", ], srcs=[ "client-dispatcher.cc", @@ -68,3 +73,8 @@ cxx_test( deps=[ ":connection", ],) +cxx_test( + name="hbase-rpc-test", + srcs=["hbase-rpc-test.cc",], + deps=[":connection",], + run_test_separately=True,) \ No newline at end of file diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 8de3a8bc52..8d2afba35c 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -26,9 +26,9 @@ #include #include +#include "serde/rpc-serde.h" #include "exceptions/exception.h" #include "serde/codec.h" -#include "serde/rpc.h" #include "utils/concurrent-map.h" // Forward decs. diff --git a/hbase-native-client/connection/hbase-ClientSerializeHandler.h b/hbase-native-client/connection/hbase-ClientSerializeHandler.h new file mode 100644 index 0000000000..e8870f01c0 --- /dev/null +++ b/hbase-native-client/connection/hbase-ClientSerializeHandler.h @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include "connection/request.h" +#include "connection/response.h" +#include "../serde/rpc-serde.h" +#include "utils/concurrent-map.h" +#include "if/RPC.pb.h" + +using namespace hbase; +using google::protobuf::Message; + +// A real rpc server would probably use generated client/server stubs +class ClientSerializeHandler: public wangle::Handler< + std::unique_ptr, std::unique_ptr, + std::unique_ptr, std::unique_ptr> { +public: + ClientSerializeHandler(std::string user_name, const std::string &server) + : user_name_(user_name), + serde_(), + server_(server), + once_flag_(std::make_unique()), + resp_msgs_( + std::make_unique>>( + 5000)) { + } + + void read(Context* ctx, std::unique_ptr buf) override { + if (LIKELY(buf != nullptr)) { + buf->coalesce(); + auto received = std::make_unique(); + pb::ResponseHeader header; + + int used_bytes = serde_.ParseDelimited(buf.get(), &header); + VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() + << " has_exception=" << header.has_exception() << ", server: " << server_; + + auto resp_msg = resp_msgs_->find_and_erase(header.call_id()); + + // set the call_id. + // This will be used to by the dispatcher to match up + // the promise with the response. + received->set_call_id(header.call_id()); + + // If there was an exception then there's no + // data left on the wire. + if (header.has_exception() == false) { + buf->trimStart(used_bytes); + + int cell_block_length = 0; + used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get()); + if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) { + cell_block_length = header.cell_block_meta().length(); + } + + VLOG(3) << "Read RPCResponse, buf length:" << buf->length() + << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length + << ", server: " << server_; + + // Make sure that bytes were parsed. + CHECK((used_bytes + cell_block_length) == buf->length()); + + if (cell_block_length > 0) { + auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length); + received->set_cell_scanner(std::shared_ptr{cell_scanner.release()}); + } + + received->set_resp_msg(resp_msg); + } + ctx->fireRead(std::move(received)); + } + } + + folly::Future write(Context* ctx, std::unique_ptr r) override { + // We need to send the header once. + // So use call_once to make sure that only one thread wins this. + std::call_once((*once_flag_), [ctx, this]() { + VLOG(3) << "Writing RPC Header to server: " << server_; + auto header = serde_.Header(user_name_); + ctx->fireWrite(std::move(header)); + }); + + VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " + << server_; + + // Now store the call id to response. + resp_msgs_->insert(std::make_pair(r->call_id(), r->resp_msg())); + + // Send the data down the pipeline. + return ctx->fireWrite( + serde_.Request(r->call_id(), r->method(), r->req_msg().get())); + } + +private: + std::unique_ptr once_flag_; + std::string user_name_; + RpcSerde serde_; + std::string server_; // for logging + // in flight requests + std::unique_ptr< + concurrent_map>> resp_msgs_; +}; diff --git a/hbase-native-client/connection/hbase-RpcClient.h b/hbase-native-client/connection/hbase-RpcClient.h new file mode 100644 index 0000000000..9137a3e8e1 --- /dev/null +++ b/hbase-native-client/connection/hbase-RpcClient.h @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hbase-ClientSerializeHandler.h" + +using namespace folly; +using namespace wangle; + +#include "utils/user-util.h" + +using ClientSerializePipeline = wangle::Pipeline>; + +// DEFINE_int32(port, 8080, "test server port"); +// DEFINE_string(host, "::1", "test server address"); + +class ClientRpcPipelineFactory : public PipelineFactory { + public: + ClientSerializePipeline::Ptr newPipeline( + std::shared_ptr sock) override { + folly::SocketAddress addr; // for logging + sock->getPeerAddress(&addr); + + auto pipeline = ClientSerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(LengthFieldPrepender()); + pipeline->addBack(ClientSerializeHandler{user_util_.user_name(), addr.describe()}); + pipeline->finalize(); + + return pipeline; + } + + private: + UserUtil user_util_{}; +}; + +class BonkMultiplexClientDispatcher + : public ClientDispatcherBase, + std::unique_ptr> { + public: + BonkMultiplexClientDispatcher() : + current_call_id_(9), requests_(5000) { + } + + void read(Context *ctx, std::unique_ptr in) override { + auto call_id = in->call_id(); + auto p = requests_.find_and_erase(call_id); + + if (in->exception()) { + p.setException(in->exception()); + } else { + p.setValue(std::move(in)); + } + } + + Future> operator()(std::unique_ptr arg) override { + auto call_id = current_call_id_++; + arg->set_call_id(call_id); + + // TODO: if the map is full (or we have more than hbase.client.perserver.requests.threshold) + // then throw ServerTooBusyException so that upper layers will retry. + auto &p = requests_[call_id]; + + auto f = p.getFuture(); + p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { + LOG(ERROR) << "e = " << call_id; + this->requests_.erase(call_id); + }); + this->pipeline_->write(std::move(arg)); + + return f; + } + + // Print some nice messages for close + Future close() override { + printf("Channel closed\n"); + return ClientDispatcherBase::close(); + } + + Future close(Context* ctx) override { + printf("Channel closed\n"); + return ClientDispatcherBase::close(ctx); + } + + private: + concurrent_map>> requests_; + std::atomic current_call_id_; +}; diff --git a/hbase-native-client/connection/hbase-RpcServer.h b/hbase-native-client/connection/hbase-RpcServer.h new file mode 100644 index 0000000000..855743cb4a --- /dev/null +++ b/hbase-native-client/connection/hbase-RpcServer.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hbase-ServerSerializeHandler.h" + +using namespace folly; +using namespace wangle; + +using ServerSerializePipeline = wangle::Pipeline>; + +// DEFINE_int32(port, 8080, "test server port"); + +class RpcService : public Service, std::unique_ptr> { + public: + Future> operator()(std::unique_ptr request) override { + + /* build Response */ + auto response = std::make_unique(); + response->set_call_id(request->call_id()); + + // TODO: use the XXXRequestProto based on method name + /* build EchoResponseProto and link it to Response */ + auto pb_resp_msg = std::make_shared(); + auto pb_req_msg = std::static_pointer_cast(request->req_msg()); + pb_resp_msg->set_message(pb_req_msg->message()); + response->set_resp_msg(pb_resp_msg); + return std::move(response); + } +}; + +class ServerRpcPipelineFactory : public PipelineFactory { + public: + ServerSerializePipeline::Ptr newPipeline( + std::shared_ptr sock) override { + auto pipeline = ServerSerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(LengthFieldPrepender()); + pipeline->addBack(ServerSerializeHandler()); + pipeline->addBack(MultiplexServerDispatcher, std::unique_ptr>(&service_)); + pipeline->finalize(); + + return pipeline; + } + + private: + ExecutorFilter, std::unique_ptr> service_{ + std::make_shared(10), + std::make_shared()}; +}; diff --git a/hbase-native-client/connection/hbase-ServerSerializeHandler.h b/hbase-native-client/connection/hbase-ServerSerializeHandler.h new file mode 100644 index 0000000000..78db9a32e6 --- /dev/null +++ b/hbase-native-client/connection/hbase-ServerSerializeHandler.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +#include "hbase-bridge.h" +#include "connection/request.h" +#include "connection/response.h" +#include "../serde/rpc-serde.h" +#include "if/RPC.pb.h" +#include "if/test.pb.h" + +using namespace hbase; + +// A real rpc server would probably use generated client/server stubs +class ServerSerializeHandler : public wangle::Handler< + std::unique_ptr, std::unique_ptr, + std::unique_ptr, std::unique_ptr> { + public: + ServerSerializeHandler() : + serde_(), + once_flag_(std::make_unique()) { + } + void read(Context* ctx, std::unique_ptr buf) override { + buf->coalesce(); + pb::RequestHeader header; + + int used_bytes = serde_.ParseDelimited(buf.get(), &header); + VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" + << header.call_id(); + + auto received = CreateReceivedRequest(header.method_name()); + + buf->trimStart(used_bytes); + if (header.has_request_param() && received != nullptr) { + used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get()); + VLOG(3) << "Read RPCRequest, buf length:" << buf->length() + << ", header PB length:" << used_bytes; + received->set_call_id(header.call_id()); + } + + if (received != nullptr) { + ctx->fireRead(std::move(received)); + } + } + + folly::Future write(Context* ctx, std::unique_ptr r) + override { + + // We need to send the header once. + // So use call_once to make sure that only one thread wins this. + std::call_once((*once_flag_), [ctx, this]() { + VLOG(3) << "Writing RPC Header to client"; + auto header = serde_.Header(""); + ctx->fireWrite(std::move(header)); + }); + + VLOG(3) << "Writing RPC Request"; + // Send the data down the pipeline. + return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get())); + } + + private: + std::unique_ptr CreateReceivedRequest(const std::string &method_name) { + // TODO: use the XXXRequestProto based on method name + std::unique_ptr result = nullptr;; + if (method_name == "Echo") { + result = std::make_unique( + std::make_shared(), + std::make_shared(), + method_name); + } + return result; + } + private: + hbase::RpcSerde serde_; + std::unique_ptr once_flag_; +}; diff --git a/hbase-native-client/connection/hbase-bridge.h b/hbase-native-client/connection/hbase-bridge.h new file mode 100644 index 0000000000..bb19314a87 --- /dev/null +++ b/hbase-native-client/connection/hbase-bridge.h @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include "request.h" +#include "response.h" + +namespace thrift { +namespace test { + typedef hbase::Request Bonk; + typedef hbase::Response Xtruct; +} +} diff --git a/hbase-native-client/connection/hbase-rpc-test.cc b/hbase-native-client/connection/hbase-rpc-test.cc new file mode 100644 index 0000000000..fa9567f8f2 --- /dev/null +++ b/hbase-native-client/connection/hbase-rpc-test.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "wangle/bootstrap/ClientBootstrap.h" +#include "wangle/channel/Handler.h" + +#include +#include +#include +#include +#include +#include + +#include "hbase-RpcClient.h" +#include "hbase-RpcServer.h" +#include "if/test.pb.h" + + +using namespace wangle; +using namespace folly; + +DEFINE_int32(port, 8080, "test server port"); +DEFINE_string(host, "::1", "test server address"); + +TEST(WANGLE, RPCSERVER) { + + ServerBootstrap server; + server.childPipeline(std::make_shared()); + server.bind(FLAGS_port); + // server.waitForStop(); + + /** + * For specific protocols, all the following code would be wrapped + * in a protocol-specific ServiceFactories. + * + * TODO: examples of ServiceFactoryFilters, for connection pooling, etc. + */ + ClientBootstrap client; + client.group(std::make_shared < wangle::IOThreadPoolExecutor > (1)); + client.pipelineFactory(std::make_shared()); + auto pipeline = client.connect(SocketAddress(FLAGS_host, FLAGS_port)).get(); + + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + + // Set an idle timeout of 5s using a filter. + ExpiringFilter , std::unique_ptr> + service(dispatcher, std::chrono::seconds(5)); + try { + std::string greetings = "hello, hbase server!"; + auto request = std::make_unique( + std::make_shared(), + std::make_shared(), "Echo"); + auto pb_msg = std::static_pointer_cast(request->req_msg()); + pb_msg->set_message(greetings); + + /* sending out request */ + service(std::move(request)).then( + [&greetings](std::unique_ptr response) { + VLOG(3) << "response back..."; + // ASSERT_TRUE(request->call_id() == response->call_id()); + auto pb_resp = std::static_pointer_cast(response->resp_msg()); + EXPECT_EQ(greetings, pb_resp->message()); + }); + } catch (const std::exception& e) { + std::cout << exceptionStr(e) << std::endl; + } + + server.stop(); + server.join(); +} diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h index f606a23ccc..0a3b8b15ee 100644 --- a/hbase-native-client/connection/sasl-handler.h +++ b/hbase-native-client/connection/sasl-handler.h @@ -27,10 +27,10 @@ #include #include +#include "serde/rpc-serde.h" #include "connection/sasl-util.h" #include "connection/service.h" #include "security/user.h" -#include "serde/rpc.h" namespace hbase { diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto new file mode 100644 index 0000000000..647fafe6c0 --- /dev/null +++ b/hbase-native-client/if/test.proto @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hbase.shaded.ipc.protobuf.generated"; +option java_outer_classname = "TestProtos"; +option java_generate_equals_and_hash = true; + +message EmptyRequestProto { +} + +message EmptyResponseProto { +} + +message EchoRequestProto { + required string message = 1; +} + +message EchoResponseProto { + required string message = 1; +} + +message PauseRequestProto { + required uint32 ms = 1; +} + +message AddrResponseProto { + required string addr = 1; +} diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto new file mode 100644 index 0000000000..93cf328a70 --- /dev/null +++ b/hbase-native-client/if/test_rpc_service.proto @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.shaded.ipc.protobuf.generated"; +option java_outer_classname = "TestRpcServiceProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option cc_generic_services = true; + +import "test.proto"; + + +/** + * A protobuf service for use in tests + */ +service TestProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(EchoRequestProto) returns (EchoResponseProto); + rpc error(EmptyRequestProto) returns (EmptyResponseProto); + rpc pause(PauseRequestProto) returns (EmptyResponseProto); + rpc addr(EmptyRequestProto) returns (AddrResponseProto); +} diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK index 18e949c7cb..a765884af1 100644 --- a/hbase-native-client/serde/BUCK +++ b/hbase-native-client/serde/BUCK @@ -22,13 +22,13 @@ cxx_library( "cell-outputstream.h", "codec.h", "region-info.h", - "rpc.h", + "rpc-serde.h", "server-name.h", "table-name.h", "zk.h", ], srcs=[ - "rpc.cc", + "rpc-serde.cc", "zk.cc", ], deps=[ diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc index 054684d2f2..1856047fbd 100644 --- a/hbase-native-client/serde/client-deserializer-test.cc +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -16,12 +16,11 @@ * limitations under the License. * */ -#include "serde/rpc.h" - #include #include #include "if/Client.pb.h" +#include "rpc-serde.h" using namespace hbase; using folly::IOBuf; diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc index 7d8b29ca1a..306f2c27f6 100644 --- a/hbase-native-client/serde/client-serializer-test.cc +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -24,7 +24,7 @@ #include "if/HBase.pb.h" #include "if/RPC.pb.h" -#include "serde/rpc.h" +#include "rpc-serde.h" using namespace hbase; using namespace hbase::pb; diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc-serde.cc similarity index 94% rename from hbase-native-client/serde/rpc.cc rename to hbase-native-client/serde/rpc-serde.cc index 957a317378..9e1f79a34c 100644 --- a/hbase-native-client/serde/rpc.cc +++ b/hbase-native-client/serde/rpc-serde.cc @@ -17,8 +17,6 @@ * */ -#include "serde/rpc.h" - #include #include #include @@ -30,6 +28,7 @@ #include #include "if/RPC.pb.h" +#include "rpc-serde.h" #include "utils/version.h" using folly::IOBuf; @@ -83,6 +82,8 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { return coded_stream.CurrentPosition(); } +RpcSerde::RpcSerde() {} + RpcSerde::RpcSerde(std::shared_ptr codec) : codec_(codec) {} std::unique_ptr RpcSerde::Preamble(bool secure) { @@ -162,6 +163,17 @@ std::unique_ptr RpcSerde::Request(const uint32_t call_id, const std::stri return PrependLength(std::move(ser_header)); } +std::unique_ptr RpcSerde::Response(const uint32_t call_id, + const google::protobuf::Message *msg) { + pb::ResponseHeader rh; + rh.set_call_id(call_id); + auto ser_header = SerializeDelimited(rh); + auto ser_resp = SerializeDelimited(*msg); + ser_header->appendChain(std::move(ser_resp)); + + return PrependLength(std::move(ser_header)); +} + std::unique_ptr RpcSerde::CreateCellScanner(std::unique_ptr buf, uint32_t offset, uint32_t length) { if (codec_ == nullptr) { diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc-serde.h similarity index 91% rename from hbase-native-client/serde/rpc.h rename to hbase-native-client/serde/rpc-serde.h index 15aa1eed37..f91a4841bf 100644 --- a/hbase-native-client/serde/rpc.h +++ b/hbase-native-client/serde/rpc-serde.h @@ -45,6 +45,7 @@ namespace hbase { */ class RpcSerde { public: + RpcSerde(); /** * Constructor assumes the default auth type. */ @@ -98,6 +99,17 @@ class RpcSerde { const google::protobuf::Message *msg); /** + * Serialize a response message into a protobuf. + * Request consists of: + * + * - Big endian length + * - ResponseHeader object + * - The passed in Message object + */ + std::unique_ptr Response(const uint32_t call_id, + const google::protobuf::Message *msg); + + /** * Serialize a message in the delimited format. * Delimited format consists of the following: * -- 2.11.0 (Apple Git-81)