diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 2a98d54..3be1acf 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -107,6 +107,33 @@ folly::Future RawAsyncTable::Put(const hbase::Put& put) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future RawAsyncTable::CheckAndPut(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, const hbase::Put &put) { + auto caller = + CreateCallerBuilder(row, connection_conf_->write_rpc_timeout()) + ->action([=, &put](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future { + return Call( + rpc_client, controller, loc, put, // &hbase::RequestConverter::SetRegionToMutateRequest, + [=, &put](const hbase::Put& put, const std::string ®ion_name) -> std::unique_ptr { + auto checkReq = RequestConverter::CheckToMutateRequest(row, family, qualifier, value, put); + auto pb_msg = std::static_pointer_cast(checkReq->req_msg()); + //RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + pb::RegionSpecifier *region_specifier = pb_msg->mutable_region(); + region_specifier->set_type( + pb::RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); + region_specifier->set_value(region_name); + return checkReq; }, + [](const Response& resp) -> bool { + auto mutate_resp = std::static_pointer_cast(resp.resp_msg()); + return mutate_resp->processed(); + } ); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} folly::Future RawAsyncTable::Delete(const hbase::Delete& del) { auto caller = diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index 6088d1b..6ad99b0 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -67,6 +67,9 @@ class RawAsyncTable { folly::Future Put(const hbase::Put& put); + folly::Future CheckAndPut(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, const hbase::Put &put); + void Scan(const hbase::Scan& scan, std::shared_ptr consumer); void Close() {} diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 54fdfc5..7cfa4cd 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -276,6 +276,34 @@ std::unique_ptr RequestConverter::ToMutateRequest(const Put &put, return pb_req; } +std::unique_ptr RequestConverter::CheckToMutateRequest(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const hbase::Put &put) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + ::hbase::pb::Condition* cond = pb_msg->mutable_condition(); + cond->set_row(row); + cond->set_family(family); + cond->set_qualifier(qualifier); + cond->set_allocated_comparator(Comparator::ToProto( + *(ComparatorFactory::BinaryComparator(value).get())).release()); + cond->set_compare_type(::hbase::pb::CompareType::EQUAL); + pb_msg->set_allocated_condition(cond); + return pb_req; +} +std::unique_ptr RequestConverter::SetRegionToMutateRequest( + std::unique_ptr pb_req, const std::string ®ion_name) { + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return std::move( + pb_req); +} + std::unique_ptr RequestConverter::DeleteToMutateRequest(const Delete &del, const std::string ®ion_name) { auto pb_req = Request::mutate(); diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index a9d65d6..fcf7961 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -84,6 +84,12 @@ class RequestConverter { static std::unique_ptr ToMutateRequest(const Put &put, const std::string ®ion_name); + static std::unique_ptr CheckToMutateRequest(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const hbase::Put &put); + static std::unique_ptr SetRegionToMutateRequest(std::unique_ptr pb_req, + const std::string ®ion_name); + static std::unique_ptr IncrementToMutateRequest(const Increment &incr, const std::string ®ion_name); diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index d2d719b..9bc4892 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -53,6 +53,11 @@ std::shared_ptr ResponseConverter::FromMutateResponse(const Response& re return ToResult(mutate_resp->result(), resp.cell_scanner()); } +bool ResponseConverter::BoolFromMutateResponse(const Response& resp) { + auto mutate_resp = std::static_pointer_cast(resp.resp_msg()); + return mutate_resp->processed(); +} + std::shared_ptr ResponseConverter::ToResult( const hbase::pb::Result& result, const std::shared_ptr cell_scanner) { std::vector> vcells; diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index b518d1c..2f8f279 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -49,6 +49,8 @@ class ResponseConverter { static std::shared_ptr FromMutateResponse(const Response& resp); + static bool BoolFromMutateResponse(const Response& resp); + static std::vector> FromScanResponse(const Response& resp); static std::vector> FromScanResponse( diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index bf22169..db2ef30 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -73,6 +73,12 @@ void Table::Put(const hbase::Put &put) { future.get(operation_timeout()); } +bool Table::CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put) { + auto context = async_table_->CheckAndPut(row, family, qualifier, value, put); + return context.get(operation_timeout()); +} + void Table::Delete(const hbase::Delete &del) { auto future = async_table_->Delete(del); future.get(operation_timeout()); diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 781c6f1..1c725d0 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -63,6 +63,21 @@ class Table { void Put(const hbase::Put &put); /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + bool CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put); + /** * @brief - Deletes some data in the table. * @param - del Delete object to perform HBase Delete operation. */