diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 870c63f..203e00d 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -36,6 +36,7 @@ cxx_library( # Once meta lookup works "meta-utils.h", "get.h", + "increment.h", "mutation.h", "put.h", "delete.h", @@ -66,6 +67,7 @@ cxx_library( "keyvalue-codec.cc", "location-cache.cc", "meta-utils.cc", + "increment.cc", "get.cc", "mutation.cc", "put.cc", @@ -158,6 +160,15 @@ cxx_test( deps=[":core",], run_test_separately=True,) cxx_test( + name="increment-test", + srcs=[ + "increment-test.cc", + ], + deps=[ + ":core", + ], + run_test_separately=True,) +cxx_test( name="put-test", srcs=[ "put-test.cc", diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index d0791b8..1d52733 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -24,12 +24,14 @@ #include "core/configuration.h" #include "core/delete.h" #include "core/get.h" +#include "core/increment.h" #include "core/hbase-configuration-loader.h" #include "core/put.h" #include "core/result.h" #include "core/table.h" #include "serde/table-name.h" #include "test-util/test-util.h" +#include "utils/bytes-util.h" using hbase::Cell; using hbase::Configuration; @@ -186,6 +188,41 @@ TEST_F(ClientTest, PutGetDelete) { client.Close(); } +TEST_F(ClientTest, Increment) { + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable("t", "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test1"; + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + int64_t incr1 = 4; + auto result = table->Increment(hbase::Increment{row}.AddColumn("d", "1", incr1)); + EXPECT_EQ(row, result->Row()); + + hbase::Get get(row); + result = table->Get(get); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result->Row()); + long l = hbase::BytesUtil::ToLong(*(result->Value("d", "1"))); + LOG(INFO) << "after incr " << l; + EXPECT_EQ(incr1, l); + + int64_t incr2 = -2; + table->Increment(hbase::Increment{row}.AddColumn("d", "1", incr2)); + + result = table->Get(get); + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result->Row()); + EXPECT_EQ(incr1+incr2, hbase::BytesUtil::ToLong(*(result->Value("d", "1")))); +} TEST_F(ClientTest, PutGet) { // Using TestUtil to populate test data ClientTest::test_util->CreateTable("t", "d"); diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 26967eb..b983f86 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -109,6 +109,20 @@ folly::Future RawAsyncTable::Delete(const hbase::Delete& del) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future> RawAsyncTable::Increment(const hbase::Increment& incr) { + auto caller = + CreateCallerBuilder>(incr.row(), connection_conf_->write_rpc_timeout()) + ->action([=, &incr](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future> { + return Call>( + rpc_client, controller, loc, incr, &hbase::RequestConverter::IncrementToMutateRequest, + &hbase::ResponseConverter::FromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} folly::Future>>> RawAsyncTable::Get( const std::vector& gets) { return this->Batch(gets); diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index d042d27..c8e9f2f 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -31,6 +31,7 @@ #include "core/connection-configuration.h" #include "core/delete.h" #include "core/get.h" +#include "core/increment.h" #include "core/put.h" #include "core/result.h" @@ -53,6 +54,7 @@ class RawAsyncTable { folly::Future> Get(const hbase::Get& get); folly::Future Delete(const hbase::Delete& del); + folly::Future> Increment(const hbase::Increment& increment); folly::Future Put(const hbase::Put& put); void Close() {} diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 85b4d6d..a1e63fe 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -232,4 +232,17 @@ std::unique_ptr RequestConverter::DeleteToMutateRequest(const Delete &d VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); return pb_req; } +std::unique_ptr RequestConverter::IncrementToMutateRequest(const Increment &incr, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_INCREMENT, incr, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} + } /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 6db1962..6d57161 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -27,6 +27,7 @@ #include "core/cell.h" #include "core/delete.h" #include "core/get.h" +#include "core/increment.h" #include "core/mutation.h" #include "core/put.h" #include "core/region-request.h" @@ -71,6 +72,9 @@ class RequestConverter { static std::unique_ptr ToMutateRequest(const Put &put, const std::string ®ion_name); + static std::unique_ptr IncrementToMutateRequest(const Increment &incr, + const std::string ®ion_name); + static std::unique_ptr ToMutation(const MutationType type, const Mutation &mutation, const int64_t nonce); diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index b29a819..dce38bb 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -29,6 +29,7 @@ #include "exceptions/exception.h" using hbase::pb::GetResponse; +using hbase::pb::MutateResponse; using hbase::pb::ScanResponse; using hbase::pb::RegionLoadStats; @@ -46,6 +47,12 @@ std::shared_ptr ResponseConverter::FromGetResponse(const Response& resp) return ToResult(get_resp->result(), resp.cell_scanner()); } +std::shared_ptr ResponseConverter::FromMutateResponse(const Response& resp) { + auto mutate_resp = std::static_pointer_cast(resp.resp_msg()); + LOG(INFO) << "FromMutateResponse:" << mutate_resp->ShortDebugString(); + return ToResult(mutate_resp->result(), resp.cell_scanner()); +} + std::shared_ptr ResponseConverter::ToResult( const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { std::vector> vcells; diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 443527d..0fdde89 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -47,6 +47,8 @@ class ResponseConverter { */ static std::shared_ptr FromGetResponse(const Response& resp); + static std::shared_ptr FromMutateResponse(const Response& resp); + static std::vector> FromScanResponse(const Response& resp); static std::unique_ptr GetResults(std::shared_ptr req, diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index dd26adf..5051f6f 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -62,6 +62,11 @@ void Table::Delete(const hbase::Delete &del) { future.get(operation_timeout()); } +std::shared_ptr Table::Increment(const hbase::Increment &increment) { + auto future = async_table_->Increment(increment); + future.get(operation_timeout()); +} + milliseconds Table::operation_timeout() const { return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index c4217d3..81ddc8e 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -67,6 +67,11 @@ class Table { */ void Delete(const hbase::Delete &del); + /** + * @brief - Increments some data in the table. + * @param - increment Increment object to perform HBase Increment operation. + */ + std::shared_ptr Increment(const hbase::Increment &increment); // TODO: Batch Puts /** diff --git a/hbase-native-client/utils/bytes-util.cc b/hbase-native-client/utils/bytes-util.cc index 5e4d728..187fcd8 100644 --- a/hbase-native-client/utils/bytes-util.cc +++ b/hbase-native-client/utils/bytes-util.cc @@ -21,6 +21,7 @@ #include #include +#include #include @@ -28,6 +29,30 @@ namespace hbase { constexpr char BytesUtil::kHexChars[]; +std::string BytesUtil::ToString(int64_t val) { + std::string res; + for (int i = 7; i > 0; i--) { + res += (int8_t) (val & 0xffu); + val = val >> 8; + } + res += (int8_t) val; + std::reverse(res.begin(), res.end()); + return res; +} +int64_t BytesUtil::ToLong(std::string str) { + if (str.length() < 8) { + throw std::runtime_error("There are not enough bytes. Expected: 8, actual: " + str.length()); + } + const char *bytes = str.c_str(); + int64_t l = 0; + for(int i = 0; i < 8; i++) { + l <<= 8; + l ^= bytes[i]; + } + return l; + +} + std::string BytesUtil::ToStringBinary(const std::string& b, size_t off, size_t len) { std::string result; // Just in case we are passed a 'len' that is > buffer length... diff --git a/hbase-native-client/utils/bytes-util.h b/hbase-native-client/utils/bytes-util.h index 541b2d7..c5fe21e 100644 --- a/hbase-native-client/utils/bytes-util.h +++ b/hbase-native-client/utils/bytes-util.h @@ -41,5 +41,8 @@ class BytesUtil { * @return string output */ static std::string ToStringBinary(const std::string& b, size_t off, size_t len); + + static std::string ToString(int64_t amt); + static long ToLong(std::string str); }; } /* namespace hbase */ diff --git a/hbase-native-client/core/increment.h b/hbase-native-client/core/increment.h new file mode 100644 index 0000000..330f44a --- /dev/null +++ b/hbase-native-client/core/increment.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "core/cell.h" +#include "core/mutation.h" + +namespace hbase { + +class Increment : public Mutation { + public: + /** + * Constructors + */ + explicit Increment(const std::string& row) : Mutation(row) {} + Increment(const Increment& cincrement) : Mutation(cincrement) {} + Increment& operator=(const Increment& cincrement) { + Mutation::operator=(cincrement); + return *this; + } + + ~Increment() = default; + + /** + * @brief Increment the column from the specific family with the specified qualifier + * by the specified amount. + * @param family family name + * @param qualifier column qualifier + * @param amount amount to increment by + */ + Increment& AddColumn(const std::string& family, const std::string& qualifier, int64_t amount); + Increment& Add(std::unique_ptr cell); +}; + +} // namespace hbase diff --git a/hbase-native-client/core/increment.cc b/hbase-native-client/core/increment.cc new file mode 100644 index 0000000..1a84266 --- /dev/null +++ b/hbase-native-client/core/increment.cc @@ -0,0 +1,57 @@ + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/increment.h" +#include +#include +#include +#include +#include + +#include "utils/bytes-util.h" + +namespace hbase { + +/** + * @brief Increment the column from the specific family with the specified qualifier + * by the specified amount. + * @param family family name + * @param qualifier column qualifier + * @param amount amount to increment by + */ +Increment& Increment::AddColumn(const std::string& family, const std::string& qualifier, + int64_t amount) { + family_map_[family].push_back(std::move( + std::make_unique(row_, family, qualifier, timestamp_, BytesUtil::ToString(amount), + hbase::CellType::PUT))); + return *this; +} +Increment& Increment::Add(std::unique_ptr cell) { + if (cell->Row() != row_) { + throw std::runtime_error("The row in " + cell->DebugString() + + " doesn't match the original one " + row_); + } + + family_map_[cell->Family()].push_back(std::move(cell)); + return *this; +} + +} // namespace hbase diff --git a/hbase-native-client/core/increment-test.cc b/hbase-native-client/core/increment-test.cc new file mode 100644 index 0000000..3d7b1d4 --- /dev/null +++ b/hbase-native-client/core/increment-test.cc @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include + +#include "core/mutation.h" +#include "core/increment.h" +#include "core/put.h" +#include "utils/time-util.h" + +using hbase::Increment; +using hbase::Increment; +using hbase::Cell; +using hbase::CellType; +using hbase::Mutation; +using hbase::TimeUtil; + +const constexpr int64_t Mutation::kLatestTimestamp; + +TEST(Increment, Row) { + Increment incr{"foo"}; + EXPECT_EQ("foo", incr.row()); +} + +TEST(Increment, Durability) { + Increment incr{"row"}; + EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, incr.Durability()); + + auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL; + incr.SetDurability(skipWal); + EXPECT_EQ(skipWal, incr.Durability()); +} + +TEST(Increment, Timestamp) { + Increment incr{"row"}; + + // test default timestamp + EXPECT_EQ(Mutation::kLatestTimestamp, incr.TimeStamp()); + + // set custom timestamp + auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); + incr.SetTimeStamp(ts); + EXPECT_EQ(ts, incr.TimeStamp()); + + // Add a column with custom timestamp + incr.AddColumn("f", "q", 5l); + auto &cell = incr.FamilyMap().at("f")[0]; + EXPECT_EQ(ts, cell->Timestamp()); +} + +TEST(Increment, HasFamilies) { + Increment incr{"row"}; + + EXPECT_EQ(false, incr.HasFamilies()); + + incr.AddColumn("f", "q", 5l); + EXPECT_EQ(true, incr.HasFamilies()); +} + +TEST(Increment, Add) { + CellType cell_type = CellType::PUT; + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + int64_t timestamp = std::numeric_limits::max(); + auto cell = std::make_unique(row, family, column, timestamp, value, cell_type); + + // add first cell + Increment incr{"row"}; + incr.Add(std::move(cell)); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at(family).size()); + + // add a non-matching row + auto cell2 = std::make_unique(row, family, column, timestamp, value, cell_type); + Increment incr2{"foo"}; + ASSERT_THROW(incr2.Add(std::move(cell2)), std::runtime_error); // rows don't match + + // add a second cell with same family + auto cell3 = std::make_unique(row, family, "column-2", timestamp, value, cell_type); + incr.Add(std::move(cell3)); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(2, incr.FamilyMap().at(family).size()); + + // add a cell to a different family + auto cell4 = std::make_unique(row, "family-2", "column-2", timestamp, value, cell_type); + incr.Add(std::move(cell4)); + EXPECT_EQ(2, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at("family-2").size()); +} + +TEST(Increment, AddColumn) { + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + + Increment incr{"row"}; + incr.AddColumn(family, column, 5l); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at(family).size()); + + // add a second cell with same family + incr.AddColumn(family, "column-2", 6l); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(2, incr.FamilyMap().at(family).size()); + + // add a cell to a different family + incr.AddColumn("family-2", column, 7l); + EXPECT_EQ(2, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at("family-2").size()); +}