diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index f71fbba..dd73b7c 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -107,6 +107,22 @@ folly::Future RawAsyncTable::Put(const hbase::Put& put) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future RawAsyncTable::CheckAndPut(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, const hbase::Put &put) { + auto checkReq = RequestConverter::CheckToMutateRequest(row, family, qualifier, value, put); + auto caller = + CreateCallerBuilder>(row, connection_conf_->write_rpc_timeout()) + ->action([=, &(*checkReq)](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future> { + return Call>( + rpc_client, controller, loc, checkReq, &hbase::RequestConverter::SetRegionToMutateRequest, + &hbase::ResponseConverter::FromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} folly::Future RawAsyncTable::Delete(const hbase::Delete& del) { auto caller = diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index c8e9f2f..bfde0bb 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -56,6 +56,8 @@ class RawAsyncTable { folly::Future Delete(const hbase::Delete& del); folly::Future> Increment(const hbase::Increment& increment); folly::Future Put(const hbase::Put& put); + folly::Future CheckAndPut(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, const hbase::Put &put); void Close() {} folly::Future>>> Get( diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index a1e63fe..a869ac9 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -220,6 +220,32 @@ std::unique_ptr RequestConverter::ToMutateRequest(const Put &put, return pb_req; } +std::unique_ptr RequestConverter::CheckToMutateRequest(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const hbase::Put &put) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + ::hbase::pb::Condition* cond = pb_msg->mutable_condition(); + cond->set_row(row); + cond->set_family(family); + cond->set_qualifier(qualifier); + cond->set_allocated_comparator(ComparatorFactory::BinaryComparator(value)); + cond->set_compare_type(::hbase::pb::CompareType::EQUAL); + pb_msg->set_allocated_condition(cond); + return pb_req; +} +std::unique_ptr RequestConverter::SetRegionToMutateRequest(const Request &pb_req, + const std::string ®ion_name) { + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} + std::unique_ptr RequestConverter::DeleteToMutateRequest(const Delete &del, const std::string ®ion_name) { auto pb_req = Request::mutate(); diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 6d57161..b08ed41 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -72,6 +72,12 @@ class RequestConverter { static std::unique_ptr ToMutateRequest(const Put &put, const std::string ®ion_name); + static std::unique_ptr CheckToMutateRequest(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const hbase::Put &put); + static std::unique_ptr SetRegionToMutateRequest(const Request &pb_req, + const std::string ®ion_name); + static std::unique_ptr IncrementToMutateRequest(const Increment &incr, const std::string ®ion_name); diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index aa51989..3b6d550 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -57,6 +57,12 @@ void Table::Put(const hbase::Put &put) { future.get(operation_timeout()); } +bool Table::CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put) { + auto context = async_table_->CheckAndPut(row, family, qualifier, value, put); + return context.get(operation_timeout()); +} + void Table::Delete(const hbase::Delete &del) { auto future = async_table_->Delete(del); future.get(operation_timeout()); diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 81ddc8e..4209874 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -62,6 +62,21 @@ class Table { void Put(const hbase::Put &put); /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + bool CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put); + /** * @brief - Deletes some data in the table. * @param - del Delete object to perform HBase Delete operation. */