diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index 1ee0a83..aa229d4 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -247,6 +247,36 @@ TEST_F(ClientTest, Increment) { EXPECT_EQ(incr1+incr2, hbase::BytesUtil::ToInt64(*(result->Value("d", "1")))); } +TEST_F(ClientTest, CheckAndPut) { + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable("check", "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("check"); + auto row = "test1"; + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform Puts + table->Put(Put{row}.AddColumn("d", "1", "value1")); + auto result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", "1", "value2")); + ASSERT_TRUE(result) << "CheckAndPut didn't replace value"; + + result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", "1", "value3")); + + // Perform the Get + hbase::Get get(row); + auto result1 = table->Get(get); + LOG(INFO) << "value " << *(result1->Value("d", "1")); + EXPECT_EQ("value2", *(result1->Value("d", "1"))); + //ASSERT_FALSE(result) << "CheckAndPut shouldn't replace value"; +} + TEST_F(ClientTest, PutGet) { // Using TestUtil to populate test data ClientTest::test_util->CreateTable("t", "d"); diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 2a98d54..018f1a4 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -107,6 +107,33 @@ folly::Future RawAsyncTable::Put(const hbase::Put& put) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future RawAsyncTable::CheckAndPut(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const hbase::Put &put, const pb::CompareType &compare_op) { + auto caller = + CreateCallerBuilder(row, connection_conf_->write_rpc_timeout()) + ->action([=, &put]( + std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future { + return Call( + rpc_client, controller, loc, put, + // request conversion + [=, &put](const hbase::Put& put, const std::string ®ion_name) + -> std::unique_ptr { + auto checkReq = RequestConverter::CheckAndPutToMutateRequest(row, family, + qualifier, value, compare_op, put, region_name); + return checkReq; }, + // response conversion + [](const Response& resp) -> folly::Unit { + auto mutate_resp = std::static_pointer_cast(resp.resp_msg()); + return folly::unit; //mutate_resp->processed() + } ); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} folly::Future RawAsyncTable::Delete(const hbase::Delete& del) { auto caller = diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index 6088d1b..d70c7e2 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -67,6 +67,10 @@ class RawAsyncTable { folly::Future Put(const hbase::Put& put); + folly::Future CheckAndPut(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, const hbase::Put &put, + const pb::CompareType &compare_op = pb::CompareType::EQUAL); + void Scan(const hbase::Scan& scan, std::shared_ptr consumer); void Close() {} diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 54fdfc5..7348926 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -276,6 +276,30 @@ std::unique_ptr RequestConverter::ToMutateRequest(const Put &put, return pb_req; } +std::unique_ptr RequestConverter::CheckAndPutToMutateRequest(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const pb::CompareType compare_op, + const hbase::Put &put, const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + ::hbase::pb::Condition* cond = pb_msg->mutable_condition(); + cond->set_row(row); + cond->set_family(family); + cond->set_qualifier(qualifier); + cond->set_allocated_comparator(Comparator::ToProto( + *(ComparatorFactory::BinaryComparator(value).get())).release()); + cond->set_compare_type(compare_op); + + pb::RegionSpecifier *region_specifier = pb_msg->mutable_region(); + region_specifier->set_type( + pb::RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); + region_specifier->set_value(region_name); + return pb_req; +} + std::unique_ptr RequestConverter::DeleteToMutateRequest(const Delete &del, const std::string ®ion_name) { auto pb_req = Request::mutate(); diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index a9d65d6..5f7909c 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -84,6 +84,11 @@ class RequestConverter { static std::unique_ptr ToMutateRequest(const Put &put, const std::string ®ion_name); + static std::unique_ptr CheckAndPutToMutateRequest(const std::string &row, + const std::string &family, const std::string &qualifier, const std::string &value, + const pb::CompareType compare_op, + const hbase::Put &put, const std::string ®ion_name); + static std::unique_ptr IncrementToMutateRequest(const Increment &incr, const std::string ®ion_name); diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index d2d719b..9bc4892 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -53,6 +53,11 @@ std::shared_ptr ResponseConverter::FromMutateResponse(const Response& re return ToResult(mutate_resp->result(), resp.cell_scanner()); } +bool ResponseConverter::BoolFromMutateResponse(const Response& resp) { + auto mutate_resp = std::static_pointer_cast(resp.resp_msg()); + return mutate_resp->processed(); +} + std::shared_ptr ResponseConverter::ToResult( const hbase::pb::Result& result, const std::shared_ptr cell_scanner) { std::vector> vcells; diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index b518d1c..2f8f279 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -49,6 +49,8 @@ class ResponseConverter { static std::shared_ptr FromMutateResponse(const Response& resp); + static bool BoolFromMutateResponse(const Response& resp); + static std::vector> FromScanResponse(const Response& resp); static std::vector> FromScanResponse( diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index bf22169..f6979af 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -73,6 +73,13 @@ void Table::Put(const hbase::Put &put) { future.get(operation_timeout()); } +bool Table::CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put, const pb::CompareType &compare_op) { + auto context = async_table_->CheckAndPut(row, family, qualifier, value, put, compare_op); + auto ret = context.get(operation_timeout()); + return true; +} + void Table::Delete(const hbase::Delete &del) { auto future = async_table_->Delete(del); future.get(operation_timeout()); diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 781c6f1..fe07849 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -63,6 +63,22 @@ class Table { void Put(const hbase::Put &put); /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + bool CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put, + const pb::CompareType &compare_op = pb::CompareType::EQUAL); + /** * @brief - Deletes some data in the table. * @param - del Delete object to perform HBase Delete operation. */