From 38df3550de7e37165d8fff7a917cd2973fd6b1bb Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 12 Apr 2016 12:03:52 -0700 Subject: [PATCH] HBASE-15620 Add on Call serialization Summary: Add on delimited serialization so that request headers and request payloads can be serialized. Test Plan: Add a unit test. Differential Revision: https://reviews.facebook.net/D56757 --- hbase-native-client/core/BUCK | 1 + hbase-native-client/core/client-dispatcher.cc | 4 +- hbase-native-client/core/client-dispatcher.h | 10 +- .../core/client-serialize-handler.cc | 111 +++++++---------- .../core/client-serialize-handler.h | 25 ++-- hbase-native-client/core/client.h | 2 +- hbase-native-client/core/connection-factory.cc | 17 ++- hbase-native-client/core/connection-factory.h | 4 +- hbase-native-client/core/get-request.h | 2 +- hbase-native-client/core/get-result.h | 2 +- hbase-native-client/core/location-cache.cc | 3 +- hbase-native-client/core/location-cache.h | 2 +- hbase-native-client/core/pipeline.h | 4 +- hbase-native-client/core/request.h | 12 +- hbase-native-client/core/response.h | 14 ++- hbase-native-client/core/service.h | 2 +- hbase-native-client/core/simple-client.cc | 36 +++++- hbase-native-client/core/table-name.h | 2 +- hbase-native-client/serde/BUCK | 54 ++++++++ .../serde/client-deserializer-test.cc | 67 ++++++++++ hbase-native-client/serde/client-deserializer.cc | 71 +++++++++++ hbase-native-client/serde/client-deserializer.h | 36 ++++++ .../serde/client-serializer-test.cc | 56 +++++++++ hbase-native-client/serde/client-serializer.cc | 138 +++++++++++++++++++++ hbase-native-client/serde/client-serializer.h | 54 ++++++++ 25 files changed, 616 insertions(+), 113 deletions(-) create mode 100644 hbase-native-client/serde/BUCK create mode 100644 hbase-native-client/serde/client-deserializer-test.cc create mode 100644 hbase-native-client/serde/client-deserializer.cc create mode 100644 hbase-native-client/serde/client-deserializer.h create mode 100644 hbase-native-client/serde/client-serializer-test.cc create mode 100644 hbase-native-client/serde/client-serializer.cc create mode 100644 hbase-native-client/serde/client-serializer.h diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 2b00d66..af4db04 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -59,6 +59,7 @@ cxx_library(name="core", ], deps=[ "//if:if", + "//serde:serde", "//third-party:folly", "//third-party:wangle", "//third-party:zookeeper_mt", diff --git a/hbase-native-client/core/client-dispatcher.cc b/hbase-native-client/core/client-dispatcher.cc index d356759..a8a6c4a 100644 --- a/hbase-native-client/core/client-dispatcher.cc +++ b/hbase-native-client/core/client-dispatcher.cc @@ -27,6 +27,7 @@ void ClientDispatcher::read(Context *ctx, Response in) { auto search = requests_.find(call_id); CHECK(search != requests_.end()); auto p = std::move(search->second); + requests_.erase(call_id); // TODO(eclark): check if the response @@ -34,8 +35,9 @@ void ClientDispatcher::read(Context *ctx, Response in) { p.setValue(in); } -Future ClientDispatcher::operator()(Request arg) { +Future ClientDispatcher::operator()(Request &arg) { auto call_id = ++current_call_id_; + arg.set_call_id(call_id); auto &p = requests_[call_id]; auto f = p.getFuture(); diff --git a/hbase-native-client/core/client-dispatcher.h b/hbase-native-client/core/client-dispatcher.h index 4b9d35a..4c5ef39 100644 --- a/hbase-native-client/core/client-dispatcher.h +++ b/hbase-native-client/core/client-dispatcher.h @@ -27,16 +27,16 @@ namespace hbase { class ClientDispatcher - : public wangle::ClientDispatcherBase { public: void read(Context *ctx, Response in) override; - folly::Future operator()(Request arg) override; + folly::Future operator()(Request &arg) override; folly::Future close(Context *ctx) override; folly::Future close() override; private: - std::unordered_map> requests_; - uint32_t current_call_id_ = 1; + std::unordered_map> requests_; + uint32_t current_call_id_ = 10; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/client-serialize-handler.cc b/hbase-native-client/core/client-serialize-handler.cc index cad1308..bbee4ea 100644 --- a/hbase-native-client/core/client-serialize-handler.cc +++ b/hbase-native-client/core/client-serialize-handler.cc @@ -19,86 +19,61 @@ #include "core/client-serialize-handler.h" +#include +#include + +#include "core/request.h" +#include "core/response.h" +#include "if/RPC.pb.h" +#include "if/Client.pb.h" + #include using namespace hbase; using namespace folly; using namespace wangle; - -static const std::string PREAMBLE = "HBas"; -static const std::string INTERFACE = "ClientService"; -static const uint8_t RPC_VERSION = 0; -static const uint8_t AUTH_TYPE = 80; +using hbase::pb::ResponseHeader; +using hbase::pb::GetResponse; // TODO(eclark): Make this actually do ANYTHING. -void ClientSerializeHandler::read(Context *ctx, std::unique_ptr msg) { - Response received; - ctx->fireRead(received); +void ClientSerializeHandler::read(Context *ctx, std::unique_ptr buf) { + if (buf) { + buf->coalesce(); + Response received; + ResponseHeader header; + + int used_bytes = deser_.parse_delimited(buf.get(), &header); + LOG(INFO) << "Read ResponseHeader size=" << used_bytes + << " call_id=" << header.call_id() + << " has_exception=" << header.has_exception(); + received.set_call_id(header.call_id()); + + if (header.has_exception() == false) { + buf->trimStart(used_bytes); + // For now assume that everything was a get. + // We'll need to set this up later. + received.set_response(std::make_shared()); + used_bytes = deser_.parse_delimited(buf.get(), received.response().get()); + } + ctx->fireRead(std::move(received)); + } } -Future ClientSerializeHandler::write(Context *ctx, Request r) { +Future ClientSerializeHandler::write(Context *ctx, Request &r) { // Keep track of if we have sent the header. - if (need_send_header_) { + if (UNLIKELY(need_send_header_)) { need_send_header_ = false; - // Should this be replacing the IOBuf rather than - // sending several different calls? - write_preamble(ctx); - write_header(ctx); + // Should we be sending just one fireWrite? + // Right now we're sending one for the header + // and one for the request. + // + // That doesn't seem like too bad, but who knows. + auto pre = ser_.preamble(); + auto header = ser_.header("elliott"); + pre->appendChain(std::move(header)); + ctx->fireWrite(std::move(pre)); } - // Send out the actual request and not just a test string. - std::string out{"test"}; - return ctx->fireWrite(prepend_length(IOBuf::copyBuffer(out))); -} - -Future ClientSerializeHandler::write_preamble(Context *ctx) { - auto magic = IOBuf::copyBuffer(PREAMBLE); - auto buf = IOBuf::create(2); - buf->append(2); - folly::io::RWPrivateCursor c(buf.get()); - - // Version - c.write(RPC_VERSION); - // Standard security aka Please don't lie to me. - c.write(AUTH_TYPE); - magic->appendChain(std::move(buf)); - return ctx->fireWrite(std::move(magic)); -} - -Future ClientSerializeHandler::write_header(Context *ctx) { - pb::ConnectionHeader h; - - // TODO(eclark): Make this not a total lie. - h.mutable_user_info()->set_effective_user("elliott"); - // The service name that we want to talk to. - // - // Right now we're completely ignoring the service interface. - // That may or may not be the correct thing to do. - // It worked for a while with the java client; until it - // didn't. - h.set_service_name(INTERFACE); - // TODO(eclark): Make this 1 copy. - auto msg = IOBuf::copyBuffer(h.SerializeAsString()); - return ctx->fireWrite(prepend_length(std::move(msg))); -} - -// Our own simple version of LengthFieldPrepender -std::unique_ptr -ClientSerializeHandler::prepend_length(std::unique_ptr msg) { - // Java ints are 4 long. So create a buffer that large - auto len_buf = IOBuf::create(4); - // Then make those bytes visible. - len_buf->append(4); - - io::RWPrivateCursor c(len_buf.get()); - // Get the size of the data to be pushed out the network. - auto size = msg->computeChainDataLength(); - - // Write the length to this IOBuf. - c.writeBE(static_cast(size)); - - // Then attach the origional to the back of len_buf - len_buf->appendChain(std::move(msg)); - return len_buf; + return ctx->fireWrite(ser_.request(r.call_id(), r.method(), r.msg())); } diff --git a/hbase-native-client/core/client-serialize-handler.h b/hbase-native-client/core/client-serialize-handler.h index 961a03b..af79195 100644 --- a/hbase-native-client/core/client-serialize-handler.h +++ b/hbase-native-client/core/client-serialize-handler.h @@ -20,25 +20,26 @@ #include -#include "if/HBase.pb.h" -#include "if/RPC.pb.h" -#include "core/request.h" -#include "core/response.h" +#include "serde/client-serializer.h" +#include "serde/client-deserializer.h" + +// Forward decs. +namespace hbase { +class Request; +class Response; +} namespace hbase { class ClientSerializeHandler - : public wangle::Handler, Response, Request, + : public wangle::Handler, Response, Request &, std::unique_ptr> { public: void read(Context *ctx, std::unique_ptr msg) override; - folly::Future write(Context *ctx, Request r) override; + folly::Future write(Context *ctx, Request &r) override; private: - folly::Future write_preamble(Context *ctx); - folly::Future write_header(Context *ctx); - // Our own simple version of LengthFieldPrepender - std::unique_ptr - prepend_length(std::unique_ptr msg); bool need_send_header_ = true; + ClientSerializer ser_; + ClientDeserializer deser_; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 818bc6b..c2dc226 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -39,4 +39,4 @@ private: LocationCache location_cache; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/connection-factory.cc b/hbase-native-client/core/connection-factory.cc index 785b239..5eea024 100644 --- a/hbase-native-client/core/connection-factory.cc +++ b/hbase-native-client/core/connection-factory.cc @@ -39,19 +39,16 @@ using namespace hbase; using namespace wangle; ConnectionFactory::ConnectionFactory() { - bootstrap_.group(std::make_shared(2)); + bootstrap_.group(std::make_shared(1)); bootstrap_.pipelineFactory(std::make_shared()); } -Future ConnectionFactory::make_connection(std::string host, - int port) { +std::shared_ptr +ConnectionFactory::make_connection(std::string host, int port) { // Connect to a given server // Then when connected create a ClientDispactcher. - auto srv = bootstrap_.connect(SocketAddress(host, port, true)) - .then([](SerializePipeline *pipeline) { - ClientDispatcher dispatcher; - dispatcher.setPipeline(pipeline); - return dispatcher; - }); - return srv; + auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get(); + auto dispatcher = std::make_shared(); + dispatcher->setPipeline(pipeline); + return dispatcher; } diff --git a/hbase-native-client/core/connection-factory.h b/hbase-native-client/core/connection-factory.h index 6f450c2..ff1bf78 100644 --- a/hbase-native-client/core/connection-factory.h +++ b/hbase-native-client/core/connection-factory.h @@ -33,9 +33,9 @@ namespace hbase { class ConnectionFactory { public: ConnectionFactory(); - folly::Future make_connection(std::string host, int port); + std::shared_ptr make_connection(std::string host, int port); private: wangle::ClientBootstrap bootstrap_; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/get-request.h b/hbase-native-client/core/get-request.h index c9113ad..bb755c5 100644 --- a/hbase-native-client/core/get-request.h +++ b/hbase-native-client/core/get-request.h @@ -32,4 +32,4 @@ private: TableName table_name_; std::string key_; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/get-result.h b/hbase-native-client/core/get-result.h index e021316..a49ad98 100644 --- a/hbase-native-client/core/get-result.h +++ b/hbase-native-client/core/get-result.h @@ -29,4 +29,4 @@ public: private: std::string key_; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 34e3236..52e86e3 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -70,8 +70,7 @@ ServerName LocationCache::ReadMetaLocation() { // This needs to be int rather than size_t as that's what ZK expects. int len = sizeof(contents); // TODO(elliott): handle disconnects/reconntion as needed. - int zk_result = - zoo_get(this->zk_, META_LOCATION, 0, contents, &len, nullptr); + int zk_result = zoo_get(this->zk_, META_LOCATION, 0, contents, &len, nullptr); if (zk_result != ZOK || len < 9) { LOG(ERROR) << "Error getting meta location."; throw runtime_error("Error getting meta location"); diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index efcfde5..28a1ee1 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -51,4 +51,4 @@ private: zhandle_t *zk_; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/pipeline.h b/hbase-native-client/core/pipeline.h index d199d08..435525a 100644 --- a/hbase-native-client/core/pipeline.h +++ b/hbase-native-client/core/pipeline.h @@ -24,11 +24,11 @@ #include "core/response.h" namespace hbase { -using SerializePipeline = wangle::Pipeline; +using SerializePipeline = wangle::Pipeline; class RpcPipelineFactory : public wangle::PipelineFactory { public: SerializePipeline::Ptr newPipeline(std::shared_ptr sock) override; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/request.h b/hbase-native-client/core/request.h index 39083ed..6f2f179 100644 --- a/hbase-native-client/core/request.h +++ b/hbase-native-client/core/request.h @@ -19,6 +19,8 @@ #pragma once #include +#include +#include namespace hbase { class Request { @@ -26,8 +28,16 @@ public: Request() : call_id_(0) {} uint32_t call_id() { return call_id_; } void set_call_id(uint32_t call_id) { call_id_ = call_id; } + google::protobuf::Message *msg() { return msg_.get(); } + void set_msg(std::unique_ptr &&msg) { + msg_ = std::move(msg); + } + std::string method() { return method_; } + void set_method(std::string method) { method_ = method; } private: uint32_t call_id_; + std::unique_ptr msg_ = nullptr; + std::string method_ = "Get"; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/response.h b/hbase-native-client/core/response.h index 34a284d..a1f0340 100644 --- a/hbase-native-client/core/response.h +++ b/hbase-native-client/core/response.h @@ -20,6 +20,13 @@ #include +// Forward +namespace google { +namespace protobuf { +class Message; +} +} + namespace hbase { class Response { @@ -27,8 +34,13 @@ public: Response() : call_id_(0) {} uint32_t call_id() { return call_id_; } void set_call_id(uint32_t call_id) { call_id_ = call_id; } + std::shared_ptr response() { return response_; } + void set_response(std::shared_ptr response) { + response_ = response; + } private: uint32_t call_id_; + std::shared_ptr response_; }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/service.h b/hbase-native-client/core/service.h index 880e65f..28bf7bf 100644 --- a/hbase-native-client/core/service.h +++ b/hbase-native-client/core/service.h @@ -23,4 +23,4 @@ namespace hbase { using HBaseService = wangle::Service; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 08e886a..6f91e5a 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -24,17 +24,27 @@ #include #include +#include #include "core/client.h" #include "core/connection-factory.h" #include "if/ZooKeeper.pb.h" +#include "if/Client.pb.h" using namespace folly; using namespace std; +using namespace std::chrono; using namespace hbase; using namespace hbase::pb; +using namespace google::protobuf; + +// TODO(eclark): remove the need for this. +DEFINE_string(region, "1588230740", "What region to send a get to"); +DEFINE_string(row, "test", "What row to get"); int main(int argc, char *argv[]) { + google::SetUsageMessage( + "Simple client to get a single row from HBase on the comamnd line"); google::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); @@ -44,14 +54,34 @@ int main(int argc, char *argv[]) { LocationCache cache{"localhost:2181", wangle::getCPUExecutor()}; auto result = cache.LocateMeta().get(); - cout << "ServerName = " << result.host_name() << ":" << result.port() << endl; // Create a connection to the local host - auto conn = cf.make_connection(result.host_name(), result.port()).get(); + auto conn = cf.make_connection(result.host_name(), result.port()); // Send the request Request r; - conn(r).get(); + + // This is a get request so make that + unique_ptr msg = make_unique(); + + // Set what region + msg->mutable_region()->set_value(FLAGS_region); + // It's always this. + msg->mutable_region()->set_type( + RegionSpecifier_RegionSpecifierType:: + RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); + // What row. + msg->mutable_get()->set_row(FLAGS_row); + // Send it. + r.set_msg(std::move(msg)); + auto resp = (*conn)(r).get(milliseconds(5000)); + + auto get_resp = std::static_pointer_cast(resp.response()); + cout << "GetResponse has_result = " << get_resp->has_result() << '\n'; + if (get_resp->has_result()) { + auto &r = get_resp->result(); + cout << "Result cell_size = " << r.cell_size() << endl; + } return 0; } diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/core/table-name.h index 796115b..37c3461 100644 --- a/hbase-native-client/core/table-name.h +++ b/hbase-native-client/core/table-name.h @@ -29,4 +29,4 @@ public: explicit TableName(std::string tableName); explicit TableName(std::string namespaceName, std::string tableName); }; -} // namespace hbase +} // namespace hbase diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK new file mode 100644 index 0000000..207607f --- /dev/null +++ b/hbase-native-client/serde/BUCK @@ -0,0 +1,54 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cxx_library(name="serde", + exported_headers=[ + "client-serializer.h", + "client-deserializer.h", + ], + srcs=[ + "client-serializer.cc", + "client-deserializer.cc", + ], + deps=[ + "//if:if", + "//third-party:folly", + ], + tests=[ + ":client-serializer-test", + ":client-deserializer-test", + ], + visibility=[ + 'PUBLIC', + ], ) + +cxx_test(name="client-serializer-test", + srcs=[ + "client-serializer-test.cc", + ], + deps=[ + ":serde", + "//if:if", + ], ) +cxx_test(name="client-deserializer-test", + srcs=[ + "client-deserializer-test.cc", + ], + deps=[ + ":serde", + "//if:if", + ], ) diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc new file mode 100644 index 0000000..bb57e50 --- /dev/null +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include "serde/client-deserializer.h" +#include "serde/client-serializer.h" +#include "if/Client.pb.h" + +using namespace hbase; +using folly::IOBuf; +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; + +TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) { + ClientDeserializer deser; + ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0); +} + +TEST(TestClientDeserializer, TestReturnFalseOnBadInput) { + ClientDeserializer deser; + auto buf = IOBuf::copyBuffer("test"); + GetRequest gr; + + ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0); +} + +TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) { + GetRequest in; + ClientSerializer ser; + ClientDeserializer deser; + + // fill up the GetRequest. + in.mutable_region()->set_value("test_region_id"); + in.mutable_region()->set_type( + RegionSpecifier_RegionSpecifierType:: + RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); + in.mutable_get()->set_row("test_row"); + + // Create the buffer + auto buf = ser.serialize_delimited(in); + + GetRequest out; + + int used_bytes = deser.parse_delimited(buf.get(), &out); + + ASSERT_GT(used_bytes, 0); + ASSERT_EQ(used_bytes, buf->length()); +} diff --git a/hbase-native-client/serde/client-deserializer.cc b/hbase-native-client/serde/client-deserializer.cc new file mode 100644 index 0000000..9e8296f --- /dev/null +++ b/hbase-native-client/serde/client-deserializer.cc @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "serde/client-deserializer.h" + +#include +#include +#include +#include +#include + +using namespace hbase; + +using folly::IOBuf; +using google::protobuf::Message; +using google::protobuf::io::ArrayInputStream; +using google::protobuf::io::CodedInputStream; + +int ClientDeserializer::parse_delimited(IOBuf *buf, Message *msg) { + if (buf == nullptr || msg == nullptr) { + return -2; + } + + // We need everything in one buffer. + // If it's not already then put it together + buf->coalesce(); + + ArrayInputStream ais{buf->data(), static_cast(buf->length())}; + CodedInputStream coded_stream{&ais}; + + uint32_t msg_size; + + // Try and read the varint. + if (coded_stream.ReadVarint32(&msg_size) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; + return -3; + } + + coded_stream.PushLimit(msg_size); + // Parse the message. + if (msg->MergeFromCodedStream(&coded_stream) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Unable to read a protobuf message from data."; + return -4; + } + + // Make sure all the data was consumed. + if (coded_stream.ConsumedEntireMessage() == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Orphaned data left after reading protobuf message"; + return -5; + } + + return coded_stream.CurrentPosition(); +} diff --git a/hbase-native-client/serde/client-deserializer.h b/hbase-native-client/serde/client-deserializer.h new file mode 100644 index 0000000..e1597d7 --- /dev/null +++ b/hbase-native-client/serde/client-deserializer.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include + +// Forward +namespace google { +namespace protobuf { +class Message; +} +} + +namespace hbase { +class ClientDeserializer { +public: + int parse_delimited(folly::IOBuf *buf, google::protobuf::Message *msg); +}; + +} // namespace hbase diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc new file mode 100644 index 0000000..46c1aa7 --- /dev/null +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -0,0 +1,56 @@ +#include + +#include +#include + +#include "serde/client-serializer.h" +#include "if/HBase.pb.h" +#include "if/RPC.pb.h" + +using namespace hbase; +using namespace hbase::pb; +using namespace folly; +using namespace folly::io; + +TEST(ClientSerializerTest, PreambleIncludesHBas) { + ClientSerializer ser; + auto buf = ser.preamble(); + const char *p = reinterpret_cast(buf->data()); + // Take the first for chars and make sure they are the + // magic string + EXPECT_EQ("HBas", std::string(p, 4)); + + EXPECT_EQ(6, buf->computeChainDataLength()); +} + +TEST(ClientSerializerTest, PreambleIncludesVersion) { + ClientSerializer ser; + auto buf = ser.preamble(); + EXPECT_EQ(0, static_cast(buf->data())[4]); + EXPECT_EQ(80, static_cast(buf->data())[5]); +} + +TEST(ClientSerializerTest, TestHeaderLengthPrefixed) { + ClientSerializer ser; + auto header = ser.header("elliott"); + + // The header should be prefixed by 4 bytes of length. + EXPECT_EQ(4, header->length()); + EXPECT_TRUE(header->length() < header->computeChainDataLength()); + EXPECT_TRUE(header->isChained()); + + // Now make sure the length is correct. + Cursor cursor(header.get()); + auto prefixed_len = cursor.readBE(); + EXPECT_EQ(prefixed_len, header->next()->length()); +} + +TEST(ClientSerializerTest, TestHeaderDecode) { + ClientSerializer ser; + auto buf = ser.header("elliott"); + auto header_buf = buf->next(); + ConnectionHeader h; + + EXPECT_TRUE(h.ParseFromArray(header_buf->data(), header_buf->length())); + EXPECT_EQ("elliott", h.user_info().effective_user()); +} diff --git a/hbase-native-client/serde/client-serializer.cc b/hbase-native-client/serde/client-serializer.cc new file mode 100644 index 0000000..0b3ad0c --- /dev/null +++ b/hbase-native-client/serde/client-serializer.cc @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "serde/client-serializer.h" + +#include +#include +#include +#include + +#include "if/HBase.pb.h" +#include "if/RPC.pb.h" + +using namespace hbase; + +using folly::IOBuf; +using folly::io::RWPrivateCursor; +using google::protobuf::Message; +using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::CodedOutputStream; +using google::protobuf::io::ZeroCopyOutputStream; +using std::string; +using std::unique_ptr; + +static const std::string PREAMBLE = "HBas"; +static const std::string INTERFACE = "ClientService"; +static const uint8_t RPC_VERSION = 0; +static const uint8_t DEFAULT_AUTH_TYPE = 80; + +ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {} + +unique_ptr ClientSerializer::preamble() { + auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); + magic->append(2); + RWPrivateCursor c(magic.get()); + c.skip(4); + // Version + c.write(RPC_VERSION); + // Standard security aka Please don't lie to me. + c.write(auth_type_); + return magic; +} + +unique_ptr ClientSerializer::header(string user) { + pb::ConnectionHeader h; + + // TODO(eclark): Make this not a total lie. + h.mutable_user_info()->set_effective_user(user); + // The service name that we want to talk to. + // + // Right now we're completely ignoring the service interface. + // That may or may not be the correct thing to do. + // It worked for a while with the java client; until it + // didn't. + h.set_service_name(INTERFACE); + return prepend_length(serialize_message(h)); +} + +unique_ptr ClientSerializer::request(uint32_t call_id, string method, + Message *msg) { + pb::RequestHeader rq; + rq.set_method_name(method); + rq.set_call_id(call_id); + rq.set_request_param(msg != nullptr); + auto ser_header = serialize_delimited(rq); + if (msg != nullptr) { + auto ser_req = serialize_delimited(*msg); + ser_header->appendChain(std::move(ser_req)); + } + + return prepend_length(std::move(ser_header)); +} + +unique_ptr ClientSerializer::prepend_length(unique_ptr msg) { + // Java ints are 4 long. So create a buffer that large + auto len_buf = IOBuf::create(4); + // Then make those bytes visible. + len_buf->append(4); + + RWPrivateCursor c(len_buf.get()); + // Get the size of the data to be pushed out the network. + auto size = msg->computeChainDataLength(); + + // Write the length to this IOBuf. + c.writeBE(static_cast(size)); + + // Then attach the origional to the back of len_buf + len_buf->appendChain(std::move(msg)); + return len_buf; +} + +unique_ptr ClientSerializer::serialize_delimited(Message &msg) { + // Get the buffer size needed for just the message. + int msg_size = msg.ByteSize(); + int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; + + // Create a buffer big enough to hold the varint and the object. + auto buf = IOBuf::create(buf_size); + buf->append(buf_size); + + // Create the array output stream. + ArrayOutputStream aos{buf->writableData(), static_cast(buf->length())}; + // Wrap the ArrayOuputStream in the coded output stream to allow writing + // Varint32 + CodedOutputStream cos{&aos}; + + // Write out the size. + cos.WriteVarint32(msg_size); + + // Now write the rest out. + // We're using the protobuf output streams here to keep track + // of where in the output array we are rather than IOBuf. + msg.SerializeWithCachedSizesToArray( + cos.GetDirectBufferForNBytesAndAdvance(msg_size)); + + // Return the buffer. + return buf; +} +// TODO(eclark): Make this 1 copy. +unique_ptr ClientSerializer::serialize_message(Message &msg) { + auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); + return buf; +} diff --git a/hbase-native-client/serde/client-serializer.h b/hbase-native-client/serde/client-serializer.h new file mode 100644 index 0000000..cf9a677 --- /dev/null +++ b/hbase-native-client/serde/client-serializer.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include +#include +#include + +// Forward +namespace google { +namespace protobuf { +class Message; +} +} +namespace hbase { +class Request; +} + +namespace hbase { +class ClientSerializer { +public: + ClientSerializer(); + std::unique_ptr preamble(); + std::unique_ptr header(std::string user); + std::unique_ptr request(uint32_t call_id, std::string method, + google::protobuf::Message *msg); + std::unique_ptr + serialize_delimited(google::protobuf::Message &msg); + + std::unique_ptr + serialize_message(google::protobuf::Message &msg); + + std::unique_ptr + prepend_length(std::unique_ptr msg); + + uint8_t auth_type_; +}; +} // namespace hbase -- 2.8.0-rc2