diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 203e00d..7518318 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -41,6 +41,7 @@ cxx_library( "put.h", "delete.h", "scan.h", + "append.h", "result.h", "request-converter.h", "response-converter.h", @@ -73,6 +74,7 @@ cxx_library( "put.cc", "delete.cc", "scan.cc", + "append.cc", "raw-async-table.cc", "result.cc", "request-converter.cc", diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index f3c5150..b3bcd1e 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -19,6 +19,7 @@ #include +#include "core/append.h" #include "core/cell.h" #include "core/client.h" #include "core/configuration.h" @@ -118,6 +119,35 @@ TEST_F(ClientTest, DefaultConfiguration) { client.Close(); } +TEST_F(ClientTest, Append) { + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable("t", "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test1"; + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + std::string val1 = "a"; + auto result = table->Append(hbase::Append{row}.Add("d", "1", val1)); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result->Row()); + EXPECT_EQ(val1, *(result->Value("d", "1"))); + + std::string val2 = "b"; + result = table->Append(hbase::Append{row}.Add("d", "1", val2)); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result->Row()); + EXPECT_EQ("ab", *(result->Value("d", "1"))); +} + TEST_F(ClientTest, PutGetDelete) { // Using TestUtil to populate test data std::string tableName = "t1"; diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index f71fbba..63f41e5 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -123,6 +123,20 @@ folly::Future RawAsyncTable::Delete(const hbase::Delete& del) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future> RawAsyncTable::Append(const hbase::Append& append) { + auto caller = + CreateCallerBuilder>(append.row(), connection_conf_->write_rpc_timeout()) + ->action([=, &append](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) -> folly::Future> { + return Call>( + rpc_client, controller, loc, append, &hbase::RequestConverter::AppendToMutateRequest, + &hbase::ResponseConverter::FromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} folly::Future>>> RawAsyncTable::Get( const std::vector& gets) { return this->Batch(gets); diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index c8e9f2f..0b29536 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -54,6 +54,7 @@ class RawAsyncTable { folly::Future> Get(const hbase::Get& get); folly::Future Delete(const hbase::Delete& del); + folly::Future> Append(const hbase::Append& append); folly::Future> Increment(const hbase::Increment& increment); folly::Future Put(const hbase::Put& put); void Close() {} diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index a1e63fe..938fff4 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -245,4 +245,16 @@ std::unique_ptr RequestConverter::IncrementToMutateRequest(const Increm return pb_req; } +std::unique_ptr RequestConverter::AppendToMutateRequest(const Append &append, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_APPEND, append, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 6d57161..4844557 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -25,6 +25,7 @@ #include "connection/request.h" #include "core/action.h" #include "core/cell.h" +#include "core/append.h" #include "core/delete.h" #include "core/get.h" #include "core/increment.h" @@ -79,6 +80,8 @@ class RequestConverter { const Mutation &mutation, const int64_t nonce); + static std::unique_ptr AppendToMutateRequest(const Append &append, + const std::string ®ion_name); private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index aa51989..757d4af 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -67,6 +67,11 @@ std::shared_ptr Table::Increment(const hbase::Increment &incremen return context.get(operation_timeout()); } +std::shared_ptr Table::Append(const hbase::Append &append) { + auto context = async_table_->Append(append); + return context.get(operation_timeout()); +} + milliseconds Table::operation_timeout() const { return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 81ddc8e..6abb8cc 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -72,6 +72,12 @@ class Table { * @param - increment Increment object to perform HBase Increment operation. */ std::shared_ptr Increment(const hbase::Increment &increment); + + /** + * @brief - Appends some data in the table. + * @param - append Append object to perform HBase Append operation. + */ + std::shared_ptr Append(const hbase::Append &append); // TODO: Batch Puts /** diff --git a/hbase-native-client/core/append.h b/hbase-native-client/core/append.h new file mode 100644 index 0000000..cab7b76 --- /dev/null +++ b/hbase-native-client/core/append.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "core/cell.h" +#include "core/mutation.h" + +namespace hbase { + +class Append : public Mutation { + public: + /** + * Constructors + */ + explicit Append(const std::string& row) : Mutation(row) {} + Append(const Append& cappend) : Mutation(cappend) {} + Append& operator=(const Append& cappend) { + Mutation::operator=(cappend); + return *this; + } + + ~Append() = default; + + /** + * @brief Add the specified column and value to this Append operation. + * @param family family name + * @param qualifier column qualifier + * @param value value to append + */ + Append& Add(const std::string& family, const std::string& qualifier, std::string& value); + Append& Add(std::unique_ptr cell); +}; + +} // namespace hbase diff --git a/hbase-native-client/core/append.cc b/hbase-native-client/core/append.cc new file mode 100644 index 0000000..b9eaf88 --- /dev/null +++ b/hbase-native-client/core/append.cc @@ -0,0 +1,54 @@ + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/append.h" +#include +#include +#include +#include +#include + +namespace hbase { + +/** + * @brief Append to the column from the specific family with the specified qualifier + * @param family family name + * @param qualifier column qualifier + * @param value value to append + */ +Append& Append::Add(const std::string& family, const std::string& qualifier, + std::string& value) { + family_map_[family].push_back(std::move( + std::make_unique(row_, family, qualifier, timestamp_, value, + hbase::CellType::PUT))); + return *this; +} +Append& Append::Add(std::unique_ptr cell) { + if (cell->Row() != row_) { + throw std::runtime_error("The row in " + cell->DebugString() + + " doesn't match the original one " + row_); + } + + family_map_[cell->Family()].push_back(std::move(cell)); + return *this; +} + +} // namespace hbase