commit 14c46e86851985790cfeaebd955c845b9c24ceb9 Author: Enis Soztutar Date: Thu Feb 2 18:29:40 2017 -0800 HBASE-17585 [C++] Use KVCodec in the RPC request/response - v2 diff --git hbase-native-client/connection/client-dispatcher.cc hbase-native-client/connection/client-dispatcher.cc index 1ace99c..626fc76 100644 --- hbase-native-client/connection/client-dispatcher.cc +++ hbase-native-client/connection/client-dispatcher.cc @@ -26,8 +26,8 @@ using namespace wangle; ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} -void ClientDispatcher::read(Context *ctx, Response in) { - auto call_id = in.call_id(); +void ClientDispatcher::read(Context *ctx, std::unique_ptr in) { + auto call_id = in->call_id(); auto search = requests_.find(call_id); CHECK(search != requests_.end()); @@ -37,13 +37,13 @@ void ClientDispatcher::read(Context *ctx, Response in) { // TODO(eclark): check if the response // is an exception. If it is then set that. - p.setValue(in); + p.setValue(std::move(in)); } -Future ClientDispatcher::operator()(std::unique_ptr arg) { +Future> ClientDispatcher::operator()(std::unique_ptr arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); - requests_.insert(call_id, Promise{}); + requests_.insert(call_id, Promise>{}); auto &p = requests_.find(call_id)->second; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { diff --git hbase-native-client/connection/client-dispatcher.h hbase-native-client/connection/client-dispatcher.h index 0489717..857042c 100644 --- hbase-native-client/connection/client-dispatcher.h +++ hbase-native-client/connection/client-dispatcher.h @@ -36,21 +36,22 @@ namespace hbase { * future. */ class ClientDispatcher - : public wangle::ClientDispatcherBase, Response> { + : public wangle::ClientDispatcherBase, + std::unique_ptr> { public: /** Create a new ClientDispatcher */ ClientDispatcher(); /** Read a response off the pipeline. */ - void read(Context *ctx, Response in) override; + void read(Context *ctx, std::unique_ptr in) override; /** Take a request as a call and send it down the pipeline. */ - folly::Future operator()(std::unique_ptr arg) override; + folly::Future> operator()(std::unique_ptr arg) override; /** Close the dispatcher and the associated pipeline. */ folly::Future close(Context *ctx) override; /** Close the dispatcher and the associated pipeline. */ folly::Future close() override; private: - folly::AtomicHashMap> requests_; + folly::AtomicHashMap>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // diff --git hbase-native-client/connection/client-handler.cc hbase-native-client/connection/client-handler.cc index 5a6dce2..af84572 100644 --- hbase-native-client/connection/client-handler.cc +++ hbase-native-client/connection/client-handler.cc @@ -36,9 +36,9 @@ using hbase::pb::ResponseHeader; using hbase::pb::GetResponse; using google::protobuf::Message; -ClientHandler::ClientHandler(std::string user_name) +ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec) : user_name_(user_name), - serde_(), + serde_(codec), once_flag_(std::make_unique()), resp_msgs_( make_unique>>( @@ -47,12 +47,12 @@ ClientHandler::ClientHandler(std::string user_name) void ClientHandler::read(Context *ctx, std::unique_ptr buf) { if (LIKELY(buf != nullptr)) { buf->coalesce(); - Response received; + auto received = std::make_unique(); ResponseHeader header; int used_bytes = serde_.ParseDelimited(buf.get(), &header); - LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() - << " has_exception=" << header.has_exception(); + VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() + << " has_exception=" << header.has_exception(); // Get the response protobuf from the map auto search = resp_msgs_->find(header.call_id()); @@ -67,17 +67,34 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { // set the call_id. // This will be used to by the dispatcher to match up // the promise with the response. - received.set_call_id(header.call_id()); + received->set_call_id(header.call_id()); // If there was an exception then there's no // data left on the wire. if (header.has_exception() == false) { buf->trimStart(used_bytes); + + int cell_block_length = 0; used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get()); + if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) { + cell_block_length = header.cell_block_meta().length(); + } + + VLOG(3) << "Read RPCResponse, buf length:" << buf->length() + << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length; + // Make sure that bytes were parsed. - CHECK(used_bytes == buf->length()); - received.set_resp_msg(resp_msg); + CHECK((used_bytes + cell_block_length) == buf->length()); + + if (cell_block_length > 0) { + auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length); + received->set_cell_scanner(std::move(cell_scanner)); + } + + received->set_resp_msg(resp_msg); } + // TODO: set exception in Response here + ctx->fireRead(std::move(received)); } } @@ -94,6 +111,9 @@ Future ClientHandler::write(Context *ctx, std::unique_ptr r) { // Now store the call id to response. resp_msgs_->insert(r->call_id(), r->resp_msg()); + + VLOG(1) << "Writing RPC Request with call_id:" << r->call_id(); + // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } diff --git hbase-native-client/connection/client-handler.h hbase-native-client/connection/client-handler.h index d860cc1..afb8e62 100644 --- hbase-native-client/connection/client-handler.h +++ hbase-native-client/connection/client-handler.h @@ -27,6 +27,7 @@ #include #include +#include "serde/codec.h" #include "serde/rpc.h" // Forward decs. @@ -51,14 +52,14 @@ namespace hbase { * on first request. */ class ClientHandler - : public wangle::Handler, Response, std::unique_ptr, - std::unique_ptr> { + : public wangle::Handler, std::unique_ptr, + std::unique_ptr, std::unique_ptr> { public: /** * Create the handler * @param user_name the user name of the user running this process. */ - explicit ClientHandler(std::string user_name); + explicit ClientHandler(std::string user_name, std::shared_ptr codec); /** * Get bytes from the wire. diff --git hbase-native-client/connection/connection-factory.cc hbase-native-client/connection/connection-factory.cc index ff83212..6aba351 100644 --- hbase-native-client/connection/connection-factory.cc +++ hbase-native-client/connection/connection-factory.cc @@ -26,8 +26,9 @@ using namespace folly; using namespace hbase; -ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool) - : io_pool_(io_pool), pipeline_factory_(std::make_shared()) {} +ConnectionFactory::ConnectionFactory(std::shared_ptr io_pool, + std::shared_ptr codec) + : io_pool_(io_pool), pipeline_factory_(std::make_shared(codec)) {} std::shared_ptr> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared>(); diff --git hbase-native-client/connection/connection-factory.h hbase-native-client/connection/connection-factory.h index da44c35..0d1e0d0 100644 --- hbase-native-client/connection/connection-factory.h +++ hbase-native-client/connection/connection-factory.h @@ -40,7 +40,8 @@ class ConnectionFactory { * Constructor. * There should only be one ConnectionFactory per client. */ - explicit ConnectionFactory(std::shared_ptr io_pool); + ConnectionFactory(std::shared_ptr io_pool, + std::shared_ptr codec); /** Default Desctructor */ virtual ~ConnectionFactory() = default; diff --git hbase-native-client/connection/connection-pool.cc hbase-native-client/connection/connection-pool.cc index 15dd64e..6635a6d 100644 --- hbase-native-client/connection/connection-pool.cc +++ hbase-native-client/connection/connection-pool.cc @@ -33,8 +33,9 @@ using hbase::HBaseService; using folly::SharedMutexWritePriority; using folly::SocketAddress; -ConnectionPool::ConnectionPool(std::shared_ptr io_executor) - : cf_(std::make_shared(io_executor)), +ConnectionPool::ConnectionPool(std::shared_ptr io_executor, + std::shared_ptr codec) + : cf_(std::make_shared(io_executor, codec)), clients_(), connections_(), map_mutex_() {} @@ -88,12 +89,12 @@ std::shared_ptr ConnectionPool::GetNewConnection( auto clientBootstrap = cf_->MakeBootstrap(); auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); - auto conneciton = std::make_shared(remote_id, dispatcher); + auto connection = std::make_shared(remote_id, dispatcher); - connections_.insert(std::make_pair(remote_id, conneciton)); + connections_.insert(std::make_pair(remote_id, connection)); clients_.insert(std::make_pair(remote_id, clientBootstrap)); - return conneciton; + return connection; } } diff --git hbase-native-client/connection/connection-pool.h hbase-native-client/connection/connection-pool.h index 1f2a182..23e5e9a 100644 --- hbase-native-client/connection/connection-pool.h +++ hbase-native-client/connection/connection-pool.h @@ -46,7 +46,8 @@ namespace hbase { class ConnectionPool { public: /** Create connection pool wit default connection factory */ - explicit ConnectionPool(std::shared_ptr io_executor); + ConnectionPool(std::shared_ptr io_executor, + std::shared_ptr codec); /** * Desctructor. diff --git hbase-native-client/connection/pipeline.cc hbase-native-client/connection/pipeline.cc index 14ad73c..00dc05c 100644 --- hbase-native-client/connection/pipeline.cc +++ hbase-native-client/connection/pipeline.cc @@ -30,7 +30,8 @@ using namespace folly; using namespace hbase; using namespace wangle; -RpcPipelineFactory::RpcPipelineFactory() : user_util_() {} +RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr codec) + : user_util_(), codec_(codec) {} SerializePipeline::Ptr RpcPipelineFactory::newPipeline( std::shared_ptr sock) { @@ -38,7 +39,7 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline( pipeline->addBack(AsyncSocketHandler{sock}); pipeline->addBack(EventBaseHandler{}); pipeline->addBack(LengthFieldBasedFrameDecoder{}); - pipeline->addBack(ClientHandler{user_util_.user_name()}); + pipeline->addBack(ClientHandler{user_util_.user_name(), codec_}); pipeline->finalize(); return pipeline; } diff --git hbase-native-client/connection/pipeline.h hbase-native-client/connection/pipeline.h index 343219d..ea40cfd 100644 --- hbase-native-client/connection/pipeline.h +++ hbase-native-client/connection/pipeline.h @@ -25,6 +25,7 @@ #include "connection/request.h" #include "connection/response.h" +#include "serde/codec.h" #include "utils/user-util.h" namespace hbase { @@ -40,7 +41,7 @@ class RpcPipelineFactory : public wangle::PipelineFactory { /** * Constructor. This will create user util. */ - RpcPipelineFactory(); + explicit RpcPipelineFactory(std::shared_ptr codec); /** * Create a new pipeline. @@ -55,5 +56,6 @@ class RpcPipelineFactory : public wangle::PipelineFactory { private: UserUtil user_util_; + std::shared_ptr codec_; }; } // namespace hbase diff --git hbase-native-client/connection/response.h hbase-native-client/connection/response.h index 560387c..1d60fed 100644 --- hbase-native-client/connection/response.h +++ hbase-native-client/connection/response.h @@ -22,6 +22,8 @@ #include #include +#include "serde/cell-scanner.h" + // Forward namespace google { namespace protobuf { @@ -42,7 +44,7 @@ class Response { * Constructor. * Initinalizes the call id to 0. 0 should never be a valid call id. */ - Response() : call_id_(0), resp_msg_(nullptr) {} + Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {} /** Get the call_id */ uint32_t call_id() { return call_id_; } @@ -62,8 +64,15 @@ class Response { resp_msg_ = std::move(response); } + void set_cell_scanner(std::unique_ptr cell_scanner) { + cell_scanner_ = std::move(cell_scanner); + } + + const std::unique_ptr& cell_scanner() const { return cell_scanner_; } + private: uint32_t call_id_; std::shared_ptr resp_msg_; + std::unique_ptr cell_scanner_; }; } // namespace hbase diff --git hbase-native-client/connection/rpc-client.cc hbase-native-client/connection/rpc-client.cc index 7621193..cfbda3a 100644 --- hbase-native-client/connection/rpc-client.cc +++ hbase-native-client/connection/rpc-client.cc @@ -39,38 +39,40 @@ class RpcChannelImplementation : public AbstractRpcChannel { }; } // namespace hbase -RpcClient::RpcClient(std::shared_ptr io_executor) +RpcClient::RpcClient(std::shared_ptr io_executor, + std::shared_ptr codec) : io_executor_(io_executor) { - cp_ = std::make_shared(io_executor_); + cp_ = std::make_shared(io_executor_, codec); } void RpcClient::Close() { io_executor_->stop(); } -std::shared_ptr RpcClient::SyncCall(const std::string& host, uint16_t port, +std::unique_ptr RpcClient::SyncCall(const std::string& host, uint16_t port, std::unique_ptr req, std::shared_ptr ticket) { - return std::make_shared(AsyncCall(host, port, std::move(req), ticket).get()); + return AsyncCall(host, port, std::move(req), ticket).get(); } -std::shared_ptr RpcClient::SyncCall(const std::string& host, uint16_t port, +std::unique_ptr RpcClient::SyncCall(const std::string& host, uint16_t port, std::unique_ptr req, std::shared_ptr ticket, const std::string& service_name) { - return std::make_shared( - AsyncCall(host, port, std::move(req), ticket, service_name).get()); + return AsyncCall(host, port, std::move(req), ticket, service_name).get(); } -folly::Future RpcClient::AsyncCall(const std::string& host, uint16_t port, - std::unique_ptr req, - std::shared_ptr ticket) { +folly::Future> RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket) { auto remote_id = std::make_shared(host, port, ticket); return GetConnection(remote_id)->SendRequest(std::move(req)); } -folly::Future RpcClient::AsyncCall(const std::string& host, uint16_t port, - std::unique_ptr req, - std::shared_ptr ticket, - const std::string& service_name) { +folly::Future> RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket, + const std::string& service_name) { auto remote_id = std::make_shared(host, port, ticket, service_name); return GetConnection(remote_id)->SendRequest(std::move(req)); } @@ -99,5 +101,5 @@ void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* contro std::unique_ptr req = std::make_unique(shared_req, shared_resp, method->name()); AsyncCall(host, port, std::move(req), ticket, method->service()->name()) - .then([done, this](Response resp) { done->Run(); }); + .then([done, this](std::unique_ptr resp) { done->Run(); }); } diff --git hbase-native-client/connection/rpc-client.h hbase-native-client/connection/rpc-client.h index aeb9b56..f4645a0 100644 --- hbase-native-client/connection/rpc-client.h +++ hbase-native-client/connection/rpc-client.h @@ -51,27 +51,28 @@ class RpcClient : public std::enable_shared_from_this { friend class RpcChannelImplementation; public: - RpcClient(std::shared_ptr io_executor); + RpcClient(std::shared_ptr io_executor, + std::shared_ptr codec); virtual ~RpcClient() { Close(); } - virtual std::shared_ptr SyncCall(const std::string &host, uint16_t port, + virtual std::unique_ptr SyncCall(const std::string &host, uint16_t port, std::unique_ptr req, std::shared_ptr ticket); - virtual std::shared_ptr SyncCall(const std::string &host, uint16_t port, + virtual std::unique_ptr SyncCall(const std::string &host, uint16_t port, std::unique_ptr req, std::shared_ptr ticket, const std::string &service_name); - virtual folly::Future AsyncCall(const std::string &host, uint16_t port, - std::unique_ptr req, - std::shared_ptr ticket); + virtual folly::Future> AsyncCall(const std::string &host, uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket); - virtual folly::Future AsyncCall(const std::string &host, uint16_t port, - std::unique_ptr req, - std::shared_ptr ticket, - const std::string &service_name); + virtual folly::Future> AsyncCall(const std::string &host, uint16_t port, + std::unique_ptr req, + std::shared_ptr ticket, + const std::string &service_name); virtual void Close(); diff --git hbase-native-client/connection/rpc-connection.h hbase-native-client/connection/rpc-connection.h index e2500b2..c37b1e0 100644 --- hbase-native-client/connection/rpc-connection.h +++ hbase-native-client/connection/rpc-connection.h @@ -41,7 +41,7 @@ class RpcConnection { virtual std::shared_ptr get_service() const { return hbase_service_; } - virtual folly::Future SendRequest(std::unique_ptr req) { + virtual folly::Future> SendRequest(std::unique_ptr req) { return (*hbase_service_)(std::move(req)); } diff --git hbase-native-client/connection/service.h hbase-native-client/connection/service.h index 0d2e258..64d4f07 100644 --- hbase-native-client/connection/service.h +++ hbase-native-client/connection/service.h @@ -26,5 +26,5 @@ #include "connection/response.h" namespace hbase { -using HBaseService = wangle::Service, Response>; +using HBaseService = wangle::Service, std::unique_ptr>; } // namespace hbase diff --git hbase-native-client/core/client-test.cc hbase-native-client/core/client-test.cc index 28eec6f..0a45fff 100644 --- hbase-native-client/core/client-test.cc +++ hbase-native-client/core/client-test.cc @@ -116,9 +116,9 @@ TEST(Client, Get) { // Using TestUtil to populate test data hbase::TestUtil *test_util = new hbase::TestUtil(); - test_util->RunShellCmd("create 't', 'd'"); - test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); - test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + test_util->RunShellCmd( + "create 't', 'd'; put 't', 'test2', 'd:2', 'value2'; put 't', 'test2', 'd:extra', 'value for " + "extra'"); // Create TableName and Row to be fetched from HBase auto tn = folly::to("t"); diff --git hbase-native-client/core/client.cc hbase-native-client/core/client.cc index c1efd8b..685524f 100644 --- hbase-native-client/core/client.cc +++ hbase-native-client/core/client.cc @@ -46,7 +46,14 @@ void Client::init(const hbase::Configuration &conf) { std::make_shared(4); // TODO: read num threads from conf io_executor_ = std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); - rpc_client_ = std::make_shared(io_executor_); + std::shared_ptr codec = nullptr; + if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == + std::string(KeyValueCodec::kJavaClassName)) { + codec = std::make_shared(); + } else { + LOG(WARNING) << "Not using RPC Cell Codec"; + } + rpc_client_ = std::make_shared(io_executor_, codec); location_cache_ = std::make_shared(conf_, cpu_executor_, rpc_client_->connection_pool()); } diff --git hbase-native-client/core/client.h hbase-native-client/core/client.h index 730981d..0e436ba 100644 --- hbase-native-client/core/client.h +++ hbase-native-client/core/client.h @@ -30,6 +30,7 @@ #include "connection/rpc-client.h" #include "core/configuration.h" #include "core/hbase_configuration_loader.h" +#include "core/keyvalue-codec.h" #include "core/location-cache.h" #include "core/table.h" #include "if/Cell.pb.h" @@ -70,6 +71,7 @@ class Client { void init(const hbase::Configuration &conf); const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; + const std::string kRpcCodec = "hbase.client.rpc.codec"; std::shared_ptr cpu_executor_; std::shared_ptr io_executor_; std::shared_ptr location_cache_; diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index dab5deb..da9f64a 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -128,10 +128,10 @@ Future> LocationCache::LocateFromMeta(const Tabl .then([tn, row, this](std::shared_ptr rpc_connection) { return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row))); }) - .then([this](Response resp) { + .then([this](std::unique_ptr resp) { // take the protobuf response and make it into // a region location. - return this->CreateLocation(std::move(resp)); + return meta_util_.CreateLocation(std::move(*resp)); }) .then([tn, this](std::shared_ptr rl) { // Make sure that the correct location was found. @@ -166,22 +166,6 @@ Future> LocationCache::LocateRegion(const hbase::pb:: } } -std::shared_ptr LocationCache::CreateLocation(const Response &resp) { - auto resp_msg = static_pointer_cast(resp.resp_msg()); - auto &results = resp_msg->results().Get(0); - auto &cells = results.cell(); - - // TODO(eclark): There should probably be some better error - // handling around this. - auto cell_zero = cells.Get(0).value(); - auto cell_one = cells.Get(1).value(); - auto row = cells.Get(0).row(); - - auto region_info = folly::to(cell_zero); - auto server_name = folly::to(cell_one); - return std::make_shared(row, std::move(region_info), server_name, nullptr); -} - // must hold shared lock on locations_lock_ shared_ptr LocationCache::GetCachedLocation(const hbase::pb::TableName &tn, const std::string &row) { diff --git hbase-native-client/core/meta-utils.cc hbase-native-client/core/meta-utils.cc index f92300c..bd26607 100644 --- hbase-native-client/core/meta-utils.cc +++ hbase-native-client/core/meta-utils.cc @@ -24,18 +24,26 @@ #include "connection/request.h" #include "connection/response.h" +#include "core/response_converter.h" #include "if/Client.pb.h" +#include "serde/region-info.h" +#include "serde/server-name.h" #include "serde/table-name.h" using hbase::pb::TableName; using hbase::MetaUtil; using hbase::Request; using hbase::Response; +using hbase::RegionLocation; +using hbase::pb::RegionInfo; using hbase::pb::ScanRequest; using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; static const std::string META_REGION = "1588230740"; +static const std::string CATALOG_FAMILY = "info"; +static const std::string REGION_INFO_COLUMN = "regioninfo"; +static const std::string SERVER_COLUMN = "server"; std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const { return folly::to(tn, ",", row, ",", "999999999999999999"); @@ -79,3 +87,29 @@ std::unique_ptr MetaUtil::MetaRequest(const TableName tn, const std::st scan->set_start_row(RegionLookupRowkey(tn, row)); return request; } + +std::shared_ptr MetaUtil::CreateLocation(const Response &resp) { + std::vector> results = ResponseConverter::FromScanResponse(resp); + if (results.size() != 1) { + throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + + std::to_string(results.size())); + } + + auto result = *results[0]; + // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO + + std::shared_ptr region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN); + std::shared_ptr server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN); + + if (region_info_str == nullptr) { + throw std::runtime_error("regioninfo column null for location"); + } + if (server_str == nullptr) { + throw std::runtime_error("server column null for location"); + } + + auto row = result.Row(); + auto region_info = folly::to(*region_info_str); + auto server_name = folly::to(*server_str); + return std::make_shared(row, std::move(region_info), server_name, nullptr); +} diff --git hbase-native-client/core/meta-utils.h hbase-native-client/core/meta-utils.h index 075215e..d67f32d 100644 --- hbase-native-client/core/meta-utils.h +++ hbase-native-client/core/meta-utils.h @@ -22,6 +22,8 @@ #include #include "connection/request.h" +#include "connection/response.h" +#include "core/region-location.h" #include "if/HBase.pb.h" #include "serde/table-name.h" @@ -43,5 +45,10 @@ class MetaUtil { * location. */ std::unique_ptr MetaRequest(const hbase::pb::TableName tn, const std::string &row) const; + + /** + * Return a RegionLocation from the parsed Response + */ + std::shared_ptr CreateLocation(const Response &resp); }; } // namespace hbase diff --git hbase-native-client/core/response_converter.cc hbase-native-client/core/response_converter.cc index 3fe2ba9..19a3554 100644 --- hbase-native-client/core/response_converter.cc +++ hbase-native-client/core/response_converter.cc @@ -22,9 +22,9 @@ #include #include "core/cell.h" -#include "if/Client.pb.h" using hbase::pb::GetResponse; +using hbase::pb::ScanResponse; namespace hbase { @@ -32,19 +32,65 @@ ResponseConverter::ResponseConverter() {} ResponseConverter::~ResponseConverter() {} -std::unique_ptr ResponseConverter::FromGetResponse(const Response& resp) { +std::unique_ptr ResponseConverter::FromGetResponse(const Response& resp) { auto get_resp = std::static_pointer_cast(resp.resp_msg()); + return ToResult(get_resp->result(), resp.cell_scanner()); +} + +std::unique_ptr ResponseConverter::ToResult( + const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { std::vector> vcells; - for (auto cell : get_resp->result().cell()) { + for (auto cell : result.cell()) { std::shared_ptr pcell = std::make_shared(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), cell.value(), static_cast(cell.cell_type())); vcells.push_back(pcell); } - return std::make_unique(vcells, get_resp->result().exists(), - get_resp->result().stale(), get_resp->result().partial()); + // iterate over the cells coming from rpc codec + if (cell_scanner != nullptr) { + while (cell_scanner->Advance()) { + vcells.push_back(cell_scanner->Current()); + } + // TODO: check associated cell count? + } + return std::make_unique(vcells, result.exists(), result.stale(), result.partial()); } +std::vector> ResponseConverter::FromScanResponse(const Response& resp) { + auto scan_resp = std::static_pointer_cast(resp.resp_msg()); + VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); + int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() + : scan_resp->results_size(); + + std::vector> results{static_cast(num_results)}; + for (int i = 0; i < num_results; i++) { + if (resp.cell_scanner() != nullptr) { + // Cells are out in cellblocks. Group them up again as Results. How many to read at a + // time will be found in getCellsLength -- length here is how many Cells in the i'th Result + int num_cells = scan_resp->cells_per_result(i); + + std::vector> vcells; + while (resp.cell_scanner()->Advance()) { + vcells.push_back(resp.cell_scanner()->Current()); + } + // TODO: check associated cell count? + + if (vcells.size() != num_cells) { + std::string msg = "Results sent from server=" + std::to_string(num_results) + + ". But only got " + std::to_string(i) + + " results completely at client. Resetting the scanner to scan again."; + LOG(ERROR) << msg; + throw std::runtime_error(msg); + } + // TODO: handle partial results per Result by checking partial_flag_per_result + results[i] = std::make_unique(vcells, false, scan_resp->stale(), false); + } else { + results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); + } + } + + return results; +} } /* namespace hbase */ diff --git hbase-native-client/core/response_converter.h hbase-native-client/core/response_converter.h index 86fb632..859644b 100644 --- hbase-native-client/core/response_converter.h +++ hbase-native-client/core/response_converter.h @@ -22,6 +22,8 @@ #include #include "connection/response.h" #include "core/result.h" +#include "if/Client.pb.h" +#include "serde/cell-scanner.h" namespace hbase { @@ -33,11 +35,16 @@ class ResponseConverter { public: ~ResponseConverter(); + static std::unique_ptr ToResult(const hbase::pb::Result& result, + const std::unique_ptr& cell_scanner); + /** * @brief Returns a Result object created by PB Message in passed Response object. * @param resp - Response object having the PB message. */ - static std::unique_ptr FromGetResponse(const Response &resp); + static std::unique_ptr FromGetResponse(const Response& resp); + + static std::vector> FromScanResponse(const Response& resp); private: // Constructor not required. We have all static methods to extract response from PB messages. diff --git hbase-native-client/core/result.cc hbase-native-client/core/result.cc index d73a5b2..4db3fca 100644 --- hbase-native-client/core/result.cc +++ hbase-native-client/core/result.cc @@ -87,7 +87,7 @@ bool Result::IsEmpty() const { return cells_.empty(); } const std::string &Result::Row() const { return row_; } -const int Result::Size() const { return cells_.size(); } +int Result::Size() const { return cells_.size(); } const ResultMap &Result::Map() const { return result_map_; } diff --git hbase-native-client/core/result.h hbase-native-client/core/result.h index cd41cf0..8ff4311 100644 --- hbase-native-client/core/result.h +++ hbase-native-client/core/result.h @@ -100,7 +100,7 @@ class Result { /** * @brief Returns the size of the underlying Cell vector */ - const int Size() const; + int Size() const; /** * @brief Map of families to all versions of its qualifiers and values. diff --git hbase-native-client/core/simple-client.cc hbase-native-client/core/simple-client.cc index 3cd0a93..4b1144c 100644 --- hbase-native-client/core/simple-client.cc +++ hbase-native-client/core/simple-client.cc @@ -31,6 +31,7 @@ #include "connection/connection-pool.h" #include "core/client.h" +#include "core/keyvalue-codec.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" #include "serde/server-name.h" @@ -43,6 +44,7 @@ using hbase::Configuration; using hbase::Response; using hbase::Request; using hbase::HBaseService; +using hbase::KeyValueCodec; using hbase::LocationCache; using hbase::ConnectionPool; using hbase::ConnectionFactory; @@ -88,7 +90,8 @@ int main(int argc, char *argv[]) { // Set up thread pools. auto cpu_pool = std::make_shared(FLAGS_threads); auto io_pool = std::make_shared(5); - auto cp = std::make_shared(io_pool); + auto codec = std::make_shared(); + auto cp = std::make_shared(io_pool, codec); // Configuration auto conf = std::make_shared(); @@ -105,7 +108,7 @@ int main(int argc, char *argv[]) { auto num_puts = FLAGS_columns; - auto results = std::vector>{}; + auto results = std::vector>>{}; auto col = uint64_t{0}; for (; col < num_puts; col++) { results.push_back( diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index 58125f9..4e30d4b 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -56,12 +56,12 @@ std::unique_ptr Table::Get(const hbase::Get &get) { auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name()); auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil - Future f = + Future> f = rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(req), user, "ClientService"); auto resp = f.get(); - return hbase::ResponseConverter::FromGetResponse(resp); + return hbase::ResponseConverter::FromGetResponse(*resp); } void Table::Close() { diff --git hbase-native-client/serde/rpc.cc hbase-native-client/serde/rpc.cc index 3bdb489..d5bca62 100644 --- hbase-native-client/serde/rpc.cc +++ hbase-native-client/serde/rpc.cc @@ -83,7 +83,7 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { return coded_stream.CurrentPosition(); } -RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {} +RpcSerde::RpcSerde(std::shared_ptr codec) : auth_type_(DEFAULT_AUTH_TYPE), codec_(codec) {} unique_ptr RpcSerde::Preamble() { auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); @@ -110,6 +110,10 @@ unique_ptr RpcSerde::Header(const string &user) { // didn't. // TODO: send the service name and user from the RpcClient h.set_service_name(INTERFACE); + + if (codec_ != nullptr) { + h.set_cell_block_codec_class(codec_->java_class_name()); + } return PrependLength(SerializeMessage(h)); } @@ -128,6 +132,14 @@ unique_ptr RpcSerde::Request(const uint32_t call_id, const string &method return PrependLength(std::move(ser_header)); } +std::unique_ptr RpcSerde::CreateCellScanner(std::unique_ptr buf, + uint32_t offset, uint32_t length) { + if (codec_ == nullptr) { + return nullptr; + } + return codec_->CreateDecoder(std::move(buf), offset, length); +} + unique_ptr RpcSerde::PrependLength(unique_ptr msg) { // Java ints are 4 long. So create a buffer that large auto len_buf = IOBuf::create(4); diff --git hbase-native-client/serde/rpc.h hbase-native-client/serde/rpc.h index 7d060c7..c59f903 100644 --- hbase-native-client/serde/rpc.h +++ hbase-native-client/serde/rpc.h @@ -21,6 +21,9 @@ #include #include +#include "serde/cell-scanner.h" +#include "serde/codec.h" + // Forward namespace folly { class IOBuf; @@ -44,7 +47,7 @@ class RpcSerde { /** * Constructor assumes the default auth type. */ - RpcSerde(); + RpcSerde(std::shared_ptr codec); /** * Destructor. This is provided just for testing purposes. @@ -76,6 +79,13 @@ class RpcSerde { std::unique_ptr Header(const std::string &user); /** + * Take ownership of the passed buffer, and create a CellScanner using the + * Codec class to parse Cells out of the wire. + */ + std::unique_ptr CreateCellScanner(std::unique_ptr buf, uint32_t offset, + uint32_t length); + + /** * Serialize a request message into a protobuf. * Request consists of: * @@ -109,5 +119,6 @@ class RpcSerde { private: /* data */ uint8_t auth_type_; + std::shared_ptr codec_; }; } // namespace hbase