commit a901cc5c1fee5e96695d2fe3bd1c307eec2c2b10 Author: Enis Soztutar Date: Wed Mar 15 14:49:52 2017 -0700 Table.Put() end to end - v1 diff --git hbase-native-client/core/raw-async-table.cc hbase-native-client/core/raw-async-table.cc index 88a3382..d98ff28 100644 --- hbase-native-client/core/raw-async-table.cc +++ hbase-native-client/core/raw-async-table.cc @@ -76,4 +76,19 @@ Future> RawAsyncTable::Get(const hbase::Get& get) { return caller->Call().then([caller](const auto r) { return r; }); } +Future RawAsyncTable::Put(const hbase::Put& put) { + auto caller = + CreateCallerBuilder(put.row(), connection_conf_->write_rpc_timeout()) + ->action([=, &put](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future { + return Call( + rpc_client, controller, loc, put, &hbase::RequestConverter::ToMutateRequest, + [](const Response& r) -> Unit { return folly::unit; }); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + } /* namespace hbase */ diff --git hbase-native-client/core/raw-async-table.h hbase-native-client/core/raw-async-table.h index bbdc6bd..978a2b8 100644 --- hbase-native-client/core/raw-async-table.h +++ hbase-native-client/core/raw-async-table.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include @@ -29,9 +30,11 @@ #include "core/async-rpc-retrying-caller.h" #include "core/connection-configuration.h" #include "core/get.h" +#include "core/put.h" #include "core/result.h" using folly::Future; +using folly::Unit; using hbase::pb::TableName; using std::chrono::nanoseconds; using std::chrono::milliseconds; @@ -52,6 +55,8 @@ class RawAsyncTable { virtual ~RawAsyncTable() = default; Future> Get(const hbase::Get& get); + + Future Put(const hbase::Put& put); void Close() {} private: diff --git hbase-native-client/core/request-converter.cc hbase-native-client/core/request-converter.cc index 9e2de3b..d1afe72 100644 --- hbase-native-client/core/request-converter.cc +++ hbase-native-client/core/request-converter.cc @@ -18,6 +18,9 @@ */ #include "core/request-converter.h" + +#include + #include #include "if/Client.pb.h" @@ -123,4 +126,69 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, return pb_req; } + +std::unique_ptr RequestConverter::ToMutation(const MutationType type, + const Mutation &mutation, + const int64_t nonce) { + auto pb_mut = std::make_unique(); + pb_mut->set_row(mutation.row()); + pb_mut->set_mutate_type(type); + pb_mut->set_durability(mutation.Durability()); + pb_mut->set_timestamp(mutation.TimeStamp()); + // TODO: set attributes from the mutation (key value pairs). + + if (nonce > 0) { + pb_mut->set_nonce(nonce); + } + + for (const auto &family : mutation.FamilyMap()) { + for (const auto &cell : family.second) { + auto column = pb_mut->add_column_value(); + column->set_family(cell->Family()); + auto qual = column->add_qualifier_value(); + qual->set_qualifier(cell->Qualifier()); + qual->set_timestamp(cell->Timestamp()); + auto cell_type = cell->Type(); + if (type == pb::MutationProto_MutationType_DELETE || + (type == pb::MutationProto_MutationType_PUT && IsDelete(cell_type))) { + qual->set_delete_type(ToDeleteType(cell_type)); + } + + qual->set_value(cell->Value()); + } + } + return std::move(pb_mut); +} + +DeleteType RequestConverter::ToDeleteType(const CellType type) { + switch (type) { + case DELETE: + return pb::MutationProto_DeleteType_DELETE_ONE_VERSION; + case DELETE_COLUMN: + return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS; + case DELETE_FAMILY: + return pb::MutationProto_DeleteType_DELETE_FAMILY; + case DELETE_FAMILY_VERSION: + return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION; + default: + throw std::runtime_error("Unknown delete type: " + folly::to(type)); + } +} + +bool RequestConverter::IsDelete(const CellType type) { + return CellType::DELETE <= type && type <= CellType::DELETE_FAMILY; +} + +std::unique_ptr RequestConverter::ToMutateRequest(const Put &put, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} } /* namespace hbase */ diff --git hbase-native-client/core/request-converter.h hbase-native-client/core/request-converter.h index 57f08cc..666076c 100644 --- hbase-native-client/core/request-converter.h +++ hbase-native-client/core/request-converter.h @@ -22,11 +22,19 @@ #include #include #include "connection/request.h" +#include "core/cell.h" #include "core/get.h" +#include "core/mutation.h" +#include "core/put.h" #include "core/scan.h" +#include "if/Client.pb.h" #include "if/HBase.pb.h" using hbase::pb::RegionSpecifier; +using hbase::pb::MutationProto; +using MutationType = hbase::pb::MutationProto_MutationType; +using DeleteType = hbase::pb::MutationProto_DeleteType; + namespace hbase { /** @@ -53,6 +61,11 @@ class RequestConverter { */ static std::unique_ptr ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr ToMutateRequest(const Put &put, const std::string ®ion_name); + + static std::unique_ptr ToMutation(const MutationType type, + const Mutation &mutation, const int64_t nonce); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +77,9 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); + + static DeleteType ToDeleteType(const CellType type); + static bool IsDelete(const CellType type); }; } /* namespace hbase */ diff --git hbase-native-client/core/simple-client.cc hbase-native-client/core/simple-client.cc index 4b1144c..def78a7 100644 --- hbase-native-client/core/simple-client.cc +++ hbase-native-client/core/simple-client.cc @@ -41,89 +41,61 @@ using namespace folly; using namespace std; using namespace std::chrono; using hbase::Configuration; -using hbase::Response; -using hbase::Request; -using hbase::HBaseService; -using hbase::KeyValueCodec; -using hbase::LocationCache; -using hbase::ConnectionPool; -using hbase::ConnectionFactory; +using hbase::Put; +using hbase::RawAsyncTable; +using hbase::AsyncConnectionImpl; using hbase::pb::TableName; using hbase::pb::ServerName; -using hbase::pb::RegionSpecifier_RegionSpecifierType; -using hbase::pb::MutateRequest; -using hbase::pb::MutationProto_MutationType; // TODO(eclark): remove the need for this. -DEFINE_string(table, "t", "What region to send a get"); -DEFINE_string(row, "test", "What row to get"); +DEFINE_string(table, "test_table", "What table to do the reads or writes"); +DEFINE_string(row, "row", "row prefix"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); -DEFINE_uint64(columns, 10000, "How many columns to write"); +DEFINE_uint64(num_rows, 10000, "How many rows to write"); DEFINE_int32(threads, 6, "How many cpu threads"); -std::unique_ptr MakeRequest(uint64_t col, std::string region_name) { - auto req = Request::mutate(); - auto msg = std::static_pointer_cast(req->req_msg()); - auto region = msg->mutable_region(); - auto suf = folly::to(col); - - region->set_value(region_name); - region->set_type( - RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); - auto mutation = msg->mutable_mutation(); - mutation->set_row(FLAGS_row + suf); - mutation->set_mutate_type(MutationProto_MutationType::MutationProto_MutationType_PUT); - auto column = mutation->add_column_value(); - column->set_family("d"); - auto qual = column->add_qualifier_value(); - qual->set_qualifier(suf); - qual->set_value("."); - - return std::move(req); +std::unique_ptr MakePut(const std::string &prefix, uint64_t i) { + auto suf = folly::to(i); + std::string row{prefix + "_" + suf}; + auto put = std::make_unique(row); + put->AddColumn("f", "q", suf); + // LOG(INFO) << "Writing row: " << put->row(); + return std::move(put); } int main(int argc, char *argv[]) { google::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line"); google::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); - - // Set up thread pools. - auto cpu_pool = std::make_shared(FLAGS_threads); - auto io_pool = std::make_shared(5); - auto codec = std::make_shared(); - auto cp = std::make_shared(io_pool, codec); + google::InstallFailureSignalHandler(); + FLAGS_logtostderr = 1; + FLAGS_stderrthreshold = 1; // Configuration auto conf = std::make_shared(); conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); - - // Create the cache. - LocationCache cache{conf, cpu_pool, cp}; + conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads); auto row = FLAGS_row; - auto tn = folly::to(FLAGS_table); - - auto loc = cache.LocateRegion(tn, row).get(milliseconds(5000)); - auto connection = loc->service(); - - auto num_puts = FLAGS_columns; - - auto results = std::vector>>{}; - auto col = uint64_t{0}; - for (; col < num_puts; col++) { - results.push_back( - folly::makeFuture(col) - .via(cpu_pool.get()) - .then([loc](uint64_t col) { return MakeRequest(col, loc->region_name()); }) - .then([connection](std::unique_ptr req) { - return (*connection)(std::move(req)); - })); + auto tn = std::make_shared(folly::to(FLAGS_table)); + auto num_puts = FLAGS_num_rows; + + // Connection and table (using raw interfaces because we do not expose AsyncTable yet). + auto async_connection = AsyncConnectionImpl::Create(conf); + auto async_table = std::make_unique(tn, async_connection); + + auto puts = std::vector>{}; + auto results = std::vector>{}; + for (uint64_t i = 0; i < num_puts; i++) { + puts.push_back(MakePut(FLAGS_row, i)); + results.push_back(async_table->Put(*puts[puts.size() - 1])); } auto allf = folly::collect(results).get(); - LOG(ERROR) << "Successfully sent " << allf.size() << " requests."; + LOG(INFO) << "Successfully sent " << allf.size() << " requests."; - io_pool->stop(); + async_table->Close(); + async_connection->Close(); return 0; } diff --git hbase-native-client/core/table.cc hbase-native-client/core/table.cc index 3c54d78..8ace4af 100644 --- hbase-native-client/core/table.cc +++ hbase-native-client/core/table.cc @@ -54,6 +54,11 @@ std::shared_ptr Table::Get(const hbase::Get &get) { return context.get(operation_timeout()); } +void Table::Put(const hbase::Put &put) { + auto future = async_table_->Put(put); + future.get(operation_timeout()); +} + milliseconds Table::operation_timeout() const { return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } diff --git hbase-native-client/core/table.h hbase-native-client/core/table.h index 803befe..cbb95b7 100644 --- hbase-native-client/core/table.h +++ hbase-native-client/core/table.h @@ -58,6 +58,14 @@ class Table { // std::vector> Get(const std::vector &gets); /** + * @brief - Puts some data in the table. + * @param - put Put object to perform HBase Put operation. + */ + void Put(const hbase::Put &put); + + // TODO: Batch Puts + + /** * @brief - Close the client connection. */ void Close();