From 5fdbf89f2c81f65948a11c9be8f3132c3cb3653c Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 12 Apr 2016 12:03:52 -0700 Subject: [PATCH] HBASE-15620 Add on Call serialization --- .../core/client-serialize-handler.cc | 56 ++++++++++++++++++++-- .../core/client-serialize-handler.h | 2 + 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/hbase-native-client/core/client-serialize-handler.cc b/hbase-native-client/core/client-serialize-handler.cc index cad1308..bc31f91 100644 --- a/hbase-native-client/core/client-serialize-handler.cc +++ b/hbase-native-client/core/client-serialize-handler.cc @@ -19,11 +19,18 @@ #include "core/client-serialize-handler.h" +#include +#include + #include using namespace hbase; using namespace folly; using namespace wangle; +using google::protobuf::Message; +using google::protobuf::io::CodedOutputStream; +using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::ZeroCopyOutputStream; static const std::string PREAMBLE = "HBas"; static const std::string INTERFACE = "ClientService"; @@ -48,8 +55,14 @@ Future ClientSerializeHandler::write(Context *ctx, Request r) { } // Send out the actual request and not just a test string. - std::string out{"test"}; - return ctx->fireWrite(prepend_length(IOBuf::copyBuffer(out))); + pb::RequestHeader rq; + rq.set_call_id(r.call_id()); + rq.set_method_name("get"); + rq.set_request_param(false); + + auto ser_header = serialize_delimited(rq); + + return ctx->fireWrite(prepend_length(std::move(ser_header))); } Future ClientSerializeHandler::write_preamble(Context *ctx) { @@ -78,9 +91,7 @@ Future ClientSerializeHandler::write_header(Context *ctx) { // It worked for a while with the java client; until it // didn't. h.set_service_name(INTERFACE); - // TODO(eclark): Make this 1 copy. - auto msg = IOBuf::copyBuffer(h.SerializeAsString()); - return ctx->fireWrite(prepend_length(std::move(msg))); + return ctx->fireWrite(prepend_length(serialize_message(h))); } // Our own simple version of LengthFieldPrepender @@ -102,3 +113,38 @@ ClientSerializeHandler::prepend_length(std::unique_ptr msg) { len_buf->appendChain(std::move(msg)); return len_buf; } + +// TODO(eclark): Make this 1 copy. +std::unique_ptr +ClientSerializeHandler::serialize_message(Message& msg) { + auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); + return buf; +} + +std::unique_ptr +ClientSerializeHandler::serialize_delimited(Message &msg) { + // Get the buffer size needed for just the message. + int msg_size = msg.ByteSize(); + int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; + + // Create a buffer big enough to hold the varint and the object. + auto buf = IOBuf::create(buf_size); + buf->append(buf_size); + + // Create the array output stream. + ArrayOutputStream aos{buf->writableData(), static_cast(buf->length())}; + // Wrap the ArrayOuputStream in the coded output stream to allow writing Varint32 + CodedOutputStream cos{&aos}; + + // Write out the size. + cos.WriteVarint32(msg_size); + + // Now write the rest out. + // We're using the protobuf output streams here to keep track + // of where in the output array we are rather than IOBuf. + msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size)); + + // Return the buffer. + return buf; +} + diff --git a/hbase-native-client/core/client-serialize-handler.h b/hbase-native-client/core/client-serialize-handler.h index 961a03b..e8bdfb7 100644 --- a/hbase-native-client/core/client-serialize-handler.h +++ b/hbase-native-client/core/client-serialize-handler.h @@ -36,6 +36,8 @@ public: private: folly::Future write_preamble(Context *ctx); folly::Future write_header(Context *ctx); + std::unique_ptr serialize_message(google::protobuf::Message& msg); + std::unique_ptr serialize_delimited(google::protobuf::Message& msg); // Our own simple version of LengthFieldPrepender std::unique_ptr prepend_length(std::unique_ptr msg); -- 2.8.0-rc2