From 4fd1ba1bffc81d0fdaee18058e3e9271f7e31c22 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 4 Apr 2016 13:52:05 -0700 Subject: [PATCH] HBASE-14855 Connect to regionserver --- hbase-native-client/core/BUCK | 17 ++++++++++ hbase-native-client/core/client-dispatcher.cc | 35 ++++++++++++++++++++ hbase-native-client/core/client-dispatcher.h | 22 ++++++++++++ hbase-native-client/core/client.cc | 10 ++++++ hbase-native-client/core/client.h | 18 +++++++++- hbase-native-client/core/connection-factory.cc | 46 ++++++++++++++++++++++++++ hbase-native-client/core/connection-factory.h | 21 ++++++++++++ hbase-native-client/core/get-request.cc | 1 + hbase-native-client/core/get-request.h | 16 +++++++++ hbase-native-client/core/get-result.cc | 1 + hbase-native-client/core/get-result.h | 14 ++++++++ hbase-native-client/core/header-handler.cc | 39 ++++++++++++++++++++++ hbase-native-client/core/header-handler.h | 16 +++++++++ hbase-native-client/core/pipeline.cc | 27 +++++++++++++++ hbase-native-client/core/pipeline.h | 13 ++++++++ hbase-native-client/core/request-encoder.h | 17 ++++++++++ hbase-native-client/core/request.h | 15 +++++++++ hbase-native-client/core/response-decoder.h | 19 +++++++++++ hbase-native-client/core/response.h | 16 +++++++++ hbase-native-client/core/service.h | 8 +++++ hbase-native-client/core/table-name.cc | 1 + hbase-native-client/core/table-name.h | 14 ++++++++ 22 files changed, 385 insertions(+), 1 deletion(-) create mode 100644 hbase-native-client/core/client-dispatcher.cc create mode 100644 hbase-native-client/core/client-dispatcher.h create mode 100644 hbase-native-client/core/connection-factory.cc create mode 100644 hbase-native-client/core/connection-factory.h create mode 100644 hbase-native-client/core/get-request.cc create mode 100644 hbase-native-client/core/get-request.h create mode 100644 hbase-native-client/core/get-result.cc create mode 100644 hbase-native-client/core/get-result.h create mode 100644 hbase-native-client/core/header-handler.cc create mode 100644 hbase-native-client/core/header-handler.h create mode 100644 hbase-native-client/core/pipeline.cc create mode 100644 hbase-native-client/core/pipeline.h create mode 100644 hbase-native-client/core/request-encoder.h create mode 100644 hbase-native-client/core/request.h create mode 100644 hbase-native-client/core/response-decoder.h create mode 100644 hbase-native-client/core/response.h create mode 100644 hbase-native-client/core/service.h create mode 100644 hbase-native-client/core/table-name.cc create mode 100644 hbase-native-client/core/table-name.h diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index d1e89d1..bd8dfdc 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -19,6 +19,8 @@ cxx_library(name="core", headers=[ "admin.h", "client.h", + "client-dispatcher.h", + "connection-factory.h", "connection.h", "connection_attr.h", "delete.h", @@ -28,17 +30,32 @@ cxx_library(name="core", "put.h", "scanner.h", "location-cache.h", + "pipeline.h", + "response.h", + "request.h", + "request-encoder.h", + "response-decoder.h", + "service.h", + "get-request.h", + "get-result.h", + "table-name.h", ], srcs=[ "admin.cc", "client.cc", + "client-dispatcher.cc", + "connection-factory.cc", "connection.cc", "get.cc", "mutation.cc", "put.cc", "delete.cc", "scanner.cc", + "pipeline.cc", "location-cache.cc", + "get-request.cc", + "get-result.cc", + "table-name.cc", ], deps=[ "//if:if", diff --git a/hbase-native-client/core/client-dispatcher.cc b/hbase-native-client/core/client-dispatcher.cc new file mode 100644 index 0000000..a18c651 --- /dev/null +++ b/hbase-native-client/core/client-dispatcher.cc @@ -0,0 +1,35 @@ +#include "client-dispatcher.h" + +using namespace folly; +using namespace hbase; +using namespace wangle; + +void ClientDispatcher::read(Context* ctx, Response in) { + auto call_id = in.call_id(); + auto search = requests_.find(call_id); + CHECK(search != requests_.end()); + auto p = std::move(search->second); + requests_.erase(call_id); + + // TODO: check if the response is an exception. If it is then set that. + p.setValue(in); +} + +Future ClientDispatcher::operator()(Request arg) { + auto call_id = current_call_id_++; + arg.set_call_id(call_id); + auto& p = requests_[call_id]; + auto f = p.getFuture(); + p.setInterruptHandler([call_id, this](const folly::exception_wrapper& e) { + this->requests_.erase(call_id); + }); + this->pipeline_->write(arg); + + return f; +} + +Future ClientDispatcher::close() { return ClientDispatcherBase::close(); } + +Future ClientDispatcher::close(Context* ctx) { + return ClientDispatcherBase::close(ctx); +} diff --git a/hbase-native-client/core/client-dispatcher.h b/hbase-native-client/core/client-dispatcher.h new file mode 100644 index 0000000..b78d603 --- /dev/null +++ b/hbase-native-client/core/client-dispatcher.h @@ -0,0 +1,22 @@ +#pragma once + +#include + +#include "pipeline.h" +#include "request.h" +#include "response.h" + +namespace hbase { +class ClientDispatcher + : public wangle::ClientDispatcherBase { + public: + void read(Context* ctx, Response in) override; + folly::Future operator()(Request arg) override; + folly::Future close(Context* ctx) override; + folly::Future close() override; + + private: + std::unordered_map> requests_; + uint32_t current_call_id_ = 1; +}; +} // hbase diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index a04daee..2468b4e 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -24,11 +24,21 @@ #include #include +#include + #include "if/ZooKeeper.pb.h" using namespace folly; +using namespace std; using namespace hbase::pb; +namespace hbase { + + +Client::Client(string quorum_spec) + : location_cache(quorum_spec, wangle::getCPUExecutor()) {} +} + int main(int argc, char *argv[]) { MetaRegionServer mrs; google::ParseCommandLineFlags(&argc, &argv, true); diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 35a3bd8..d309cdc 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -20,7 +20,23 @@ #pragma once #include +#include #include "if/Cell.pb.h" -class Client {}; +#include "get-request.h" +#include "get-result.h" + +namespace hbase +{ +class Client { + public: + explicit Client(std::string quorum_spec); + folly::Future get(const GetRequest& get_request); + //folly::Future put(const PutRequest& put_request); + + private: + LocationCache location_cache; +}; + +} /* hbase */ diff --git a/hbase-native-client/core/connection-factory.cc b/hbase-native-client/core/connection-factory.cc new file mode 100644 index 0000000..4bf692a --- /dev/null +++ b/hbase-native-client/core/connection-factory.cc @@ -0,0 +1,46 @@ +#include "connection-factory.h" + +#include +#include +#include +#include +#include + +#include "client-dispatcher.h" +#include "pipeline.h" +#include "request.h" +#include "response.h" +#include "service.h" + +using namespace folly; +using namespace hbase; +using namespace wangle; + +ConnectionFactory::ConnectionFactory() { + bootstrap_.group(std::make_shared(1)); + bootstrap_.pipelineFactory(std::make_shared()); +} + +Future ConnectionFactory::make_connection(std::string host, + int port) { + auto srv = bootstrap_.connect(SocketAddress(host, port, true)) + .then([](ClientPipeline* pipeline) { + ClientDispatcher dispatcher; + dispatcher.setPipeline(pipeline); + return dispatcher; + }); + return srv; +} + +int main(int argc, char** argv) { + hbase::ConnectionFactory cf; + hbase::Request r; + + auto client = cf.make_connection("localhost", 16010).get(); + + client(r).get(); + + client.close(); + + return 0; +} diff --git a/hbase-native-client/core/connection-factory.h b/hbase-native-client/core/connection-factory.h new file mode 100644 index 0000000..35d185c --- /dev/null +++ b/hbase-native-client/core/connection-factory.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +#include "service.h" +#include "pipeline.h" +#include "request.h" +#include "response.h" +#include "client-dispatcher.h" + +namespace hbase { +class ConnectionFactory { + public: + ConnectionFactory(); + folly::Future make_connection(std::string host, int port); + + private: + wangle::ClientBootstrap bootstrap_; +}; +} // hbase diff --git a/hbase-native-client/core/get-request.cc b/hbase-native-client/core/get-request.cc new file mode 100644 index 0000000..0873b96 --- /dev/null +++ b/hbase-native-client/core/get-request.cc @@ -0,0 +1 @@ +#include "get-request.h" diff --git a/hbase-native-client/core/get-request.h b/hbase-native-client/core/get-request.h new file mode 100644 index 0000000..22e4b37 --- /dev/null +++ b/hbase-native-client/core/get-request.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include "table-name.h" + +namespace hbase { + +class GetRequest { + public: + GetRequest(TableName table_name, std::string key); + + private: + TableName table_name_; + std::string key_; +}; +} diff --git a/hbase-native-client/core/get-result.cc b/hbase-native-client/core/get-result.cc new file mode 100644 index 0000000..9d485b8 --- /dev/null +++ b/hbase-native-client/core/get-result.cc @@ -0,0 +1 @@ +#include "get-result.h" diff --git a/hbase-native-client/core/get-result.h b/hbase-native-client/core/get-result.h new file mode 100644 index 0000000..46603b4 --- /dev/null +++ b/hbase-native-client/core/get-result.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace hbase { + +class GetResult { + public: + explicit GetResult(std::string key); + + private: + std::string key_; +}; +} diff --git a/hbase-native-client/core/header-handler.cc b/hbase-native-client/core/header-handler.cc new file mode 100644 index 0000000..735e6ab --- /dev/null +++ b/hbase-native-client/core/header-handler.cc @@ -0,0 +1,39 @@ +#include "header-handler.h" + +#include "hbase/hboss/if/HBase.pb.h" +#include "hbase/hboss/if/RPC.pb.h" + +using namespace hbase; + +const static std::string PREAMBLE = "HBas"; + +folly::Future HeaderHandler::write( + Context* ctx, std::unique_ptr msg) { + printf("CTX -> %p", ctx); + if (need_send_header_) { + need_send_header_ = false; + write_header(ctx); + } + return ctx->fireWrite(std::move(msg)); +} + +folly::Future HeaderHandler::write_header(Context* ctx) { + pb::ConnectionHeader h; + h.mutable_user_info()->set_effective_user("elliott"); + h.set_service_name("ClientService"); + + auto magic = folly::IOBuf::copyBuffer(PREAMBLE); + auto buf = folly::IOBuf::create(6); + auto msg = folly::IOBuf::copyBuffer(h.SerializeAsString()); + + buf->append(6); + folly::io::RWPrivateCursor c(buf.get()); + c.write((uint8_t)0); + c.write((uint8_t)80); + c.write((uint32_t)h.ByteSize()); + + buf->prependChain(std::move(msg)); + magic->prependChain(std::move(buf)); + + return ctx->fireWrite(std::move(magic)); +} diff --git a/hbase-native-client/core/header-handler.h b/hbase-native-client/core/header-handler.h new file mode 100644 index 0000000..5819f91 --- /dev/null +++ b/hbase-native-client/core/header-handler.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +namespace hbase { + +class HeaderHandler : public wangle::OutboundBytesToBytesHandler { + public: + folly::Future write(Context* ctx, + std::unique_ptr msg) override; + + private: + folly::Future write_header(Context* ctx); + bool need_send_header_{true}; +}; +} // hbase diff --git a/hbase-native-client/core/pipeline.cc b/hbase-native-client/core/pipeline.cc new file mode 100644 index 0000000..78baae6 --- /dev/null +++ b/hbase-native-client/core/pipeline.cc @@ -0,0 +1,27 @@ +#include "pipeline.h" + +#include +#include + +#include "header-handler.h" +#include "request-encoder.h" +#include "response-decoder.h" + +using namespace folly; +using namespace hbase; +using namespace wangle; + +ClientPipeline::Ptr RpcPipelineFactory::newPipeline( + std::shared_ptr sock) { + auto pipeline = ClientPipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + pipeline->addBack(OutputBufferingHandler()); + pipeline->addBack(EventBaseHandler()); + + pipeline->addBack(HeaderHandler()); + pipeline->addBack(ResponseDecoder()); + pipeline->addBack(RequestEncoder()); + + pipeline->finalize(); + return std::move(pipeline); +} diff --git a/hbase-native-client/core/pipeline.h b/hbase-native-client/core/pipeline.h new file mode 100644 index 0000000..da9b1ea --- /dev/null +++ b/hbase-native-client/core/pipeline.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include "request.h" + +namespace hbase { +using ClientPipeline = wangle::Pipeline; + +class RpcPipelineFactory : public wangle::PipelineFactory { + public: + ClientPipeline::Ptr newPipeline(std::shared_ptr sock) override; +}; +} // hbase diff --git a/hbase-native-client/core/request-encoder.h b/hbase-native-client/core/request-encoder.h new file mode 100644 index 0000000..34c5727 --- /dev/null +++ b/hbase-native-client/core/request-encoder.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +#include "request.h" + +namespace hbase { + +class RequestEncoder : public wangle::MessageToByteEncoder { + public: + std::unique_ptr encode(Request& req) override { + printf("Encoding\n"); + std::string out{"test"}; + return folly::IOBuf::copyBuffer(out); + } +}; +} diff --git a/hbase-native-client/core/request.h b/hbase-native-client/core/request.h new file mode 100644 index 0000000..fc0dfab --- /dev/null +++ b/hbase-native-client/core/request.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace hbase { +class Request { + public: + Request() : call_id_(0) {} + uint32_t call_id() { return call_id_; } + void set_call_id(uint32_t call_id) { call_id_ = call_id; } + + private: + uint32_t call_id_; +}; +} diff --git a/hbase-native-client/core/response-decoder.h b/hbase-native-client/core/response-decoder.h new file mode 100644 index 0000000..c9ad37f --- /dev/null +++ b/hbase-native-client/core/response-decoder.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include "response.h" + +namespace hbase { + +class ResponseDecoder : public wangle::ByteToMessageDecoder { + public: + bool decode(Context* ctx, + folly::IOBufQueue& buf, + Response& result, + size_t& size) { + printf("Decoding size = %zu", size); + return false; + } +}; +} diff --git a/hbase-native-client/core/response.h b/hbase-native-client/core/response.h new file mode 100644 index 0000000..c68c5f8 --- /dev/null +++ b/hbase-native-client/core/response.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +namespace hbase { + +class Response { + public: + Response() : call_id_(0) {} + uint32_t call_id() { return call_id_; } + void set_call_id(uint32_t call_id) { call_id_ = call_id; } + + private: + uint32_t call_id_; +}; +} diff --git a/hbase-native-client/core/service.h b/hbase-native-client/core/service.h new file mode 100644 index 0000000..a6762bf --- /dev/null +++ b/hbase-native-client/core/service.h @@ -0,0 +1,8 @@ +#pragma once + +#include "request.h" +#include "response.h" + +namespace hbase { +using HBaseService = wangle::Service; +} diff --git a/hbase-native-client/core/table-name.cc b/hbase-native-client/core/table-name.cc new file mode 100644 index 0000000..b00ea3a --- /dev/null +++ b/hbase-native-client/core/table-name.cc @@ -0,0 +1 @@ +#include "table-name.h" diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/core/table-name.h new file mode 100644 index 0000000..d08ffd7 --- /dev/null +++ b/hbase-native-client/core/table-name.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +#include "location-cache.h" +namespace hbase { + +// This is the core class of a HBase client. +class TableName { + public: + explicit TableName(std::string tableName); + explicit TableName(std::string namespaceName, std::string tableName); +}; +} // hbase -- 2.8.0-rc2