From a7a9fe8813bab871dc8a0565df90f8bc6d7319db Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Fri, 6 Jan 2017 13:09:36 +1100 Subject: [PATCH] 1) Removed ResultScanner and Scan dpendecies from Table 2) Changes in Method names 3) Table will be passed, LocationCache, Configuration and RpcClient during construction 4) Unit tests hooked up to client 5) rpc-client.(cc/h) was not included in connection/BUCK, due to which compilation with BUCK was failing on a clean checkout. Have fixed that as well. 6) Client can be constructed with or without Configuration. diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c22cc89..19536d5 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -30,6 +30,7 @@ cxx_library( "rpc-connection.h", "response.h", "service.h", + "rpc-client.h", ], srcs=[ "client-dispatcher.cc", @@ -38,6 +39,7 @@ cxx_library( "connection-pool.cc", "pipeline.cc", "request.cc", + "rpc-client.cc", ], deps=[ "//if:if", diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index a5ea5c0..1bb4a02 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -33,6 +33,8 @@ cxx_library( "hbase_configuration_loader.h", "scan.h", "result.h", + "protobuf_request_builder.h", + "table.h", ], srcs=[ "cell.cc", @@ -45,6 +47,8 @@ cxx_library( "hbase_configuration_loader.cc", "scan.cc", "result.cc", + "protobuf_request_builder.cc", + "table.cc", ], deps=[ "//connection:connection", @@ -94,6 +98,25 @@ cxx_test( srcs=["result-test.cc",], deps=[":core",], run_test_separately=True,) +cxx_test( + name="protobuf_request_builder-test", + srcs=["protobuf_request_builder-test.cc",], + deps=[":core",], + run_test_separately=True,) +cxx_test( + name="client-test", + srcs=["client-test.cc",], + deps=[ + ":core", + "//connection:connection", + "//if:if", + "//serde:serde", + "//third-party:folly", + "//third-party:wangle", + "//third-party:zookeeper_mt", + "//test-util:test-util", + ], + run_test_separately=True,) cxx_binary( name="simple-client", srcs=["simple-client.cc",], diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc new file mode 100644 index 0000000..ed61322 --- /dev/null +++ b/hbase-native-client/core/client-test.cc @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/hbase_configuration_loader.h" +#include "core/result.h" +#include "core/table.h" +#include "test-util/test-util.h" +#include "serde/table-name.h" + +TEST(Client, EmptyConfigurationPassedToClient) { + hbase::optional conf; + std::shared_ptr> conf_ptr; + + // Create a client + hbase::Client client("localhost:2181", conf_ptr); + client.Close(); +} + +TEST(Client, ConfigurationPassedToClient) { + // Fetch zookeeper quorum + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + std::shared_ptr> conf_ptr = + (conf) ? std::make_shared>(conf) : nullptr; + auto zk_quorum = (conf_ptr.get()) + ? conf_ptr->value().Get("hbase.zookeeper.quorum", "localhost:2181") + : "127.0.0.1:2181"; + + // Create a client + hbase::Client client(zk_quorum, conf_ptr); + client.Close(); +} + +TEST(Client, DefaultConfiguration) { + // Fetch zookeeper quorum + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + auto zk_quorum = + (conf) ? conf->Get("hbase.zookeeper.quorum", "localhost:2181") : "127.0.0.1:2181"; + + // Create a client + hbase::Client client(zk_quorum); + client.Close(); +} + +TEST(Client, Get) { + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Fetch zookeeper quorum + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + auto zk_quorum = + (conf) ? conf->Get("hbase.zookeeper.quorum", "localhost:2181") : "127.0.0.1:2181"; + + // Create a client + hbase::Client client(zk_quorum); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table != nullptr) << "Unable to get connection to Table."; + + // Perform the Get + std::unique_ptr result = table->Get(get); + ASSERT_TRUE(result != nullptr) << "Result can't be null"; + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + + table->Close(); + client.Close(); +} + +TEST(Client, GetForNonExistentTable) { + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t_not_exists"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Fetch zookeeper quorum + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + auto zk_quorum = + (conf) ? conf->Get("hbase.zookeeper.quorum", "localhost:2181") : "127.0.0.1:2181"; + + // Create a client + hbase::Client client(zk_quorum); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table != nullptr) << "Unable to get connection to Table."; + + // Perform the Get + std::unique_ptr result = table->Get(get); + ASSERT_TRUE(result == nullptr) << "Table does not exist. We should get an exception"; + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + + table->Close(); + client.Close(); +} + +TEST(Client, GetForNonExistentRow) { + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't_exists', 'd'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t_exists"); + auto row = "row_not_exists"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Fetch zookeeper quorum + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + auto zk_quorum = + (conf) ? conf->Get("hbase.zookeeper.quorum", "localhost:2181") : "127.0.0.1:2181"; + + // Create a client + hbase::Client client(zk_quorum); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table != nullptr) << "Unable to get connection to Table."; + + // Perform the Get + std::unique_ptr result = table->Get(get); + ASSERT_TRUE(result != nullptr) << "Result can't be null. It will be empty"; + ASSERT_TRUE(result->IsEmpty()) << "Result should be empty."; + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + table->Close(); + client.Close(); +} diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 0389b24..ee2f676 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -1,5 +1,5 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one + w * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -20,27 +20,51 @@ #include "core/client.h" #include - -#include -#include - -using namespace folly; -using namespace std; -using namespace hbase::pb; +#include +#include namespace hbase { -Client::Client(std::string zk_quorum) +Client::Client(const std::string &zk_quorum) + : cpu_executor_(std::make_shared(4)), + io_executor_(std::make_shared(sysconf(_SC_NPROCESSORS_ONLN))), + location_cache_( + std::make_shared(zk_quorum, cpu_executor_, io_executor_)) { + HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + std::shared_ptr> conf_ = + (conf) ? std::make_shared>(conf) : nullptr; +} + +Client::Client(const std::string &zk_quorum, + const std::shared_ptr> &conf) : cpu_executor_(std::make_shared(4)), - io_executor_(std::make_shared( - sysconf(_SC_NPROCESSORS_ONLN))), - location_cache_(zk_quorum, cpu_executor_, io_executor_) {} + io_executor_(std::make_shared(sysconf(_SC_NPROCESSORS_ONLN))), + location_cache_( + std::make_shared(zk_quorum, cpu_executor_, io_executor_)), + conf_(conf) {} // We can't have the threads continue running after everything is done // that leads to an error. Client::~Client() { cpu_executor_->stop(); io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); +} + +std::unique_ptr Client::Table(const TableName &table_name) { + std::unique_ptr table = + std::make_unique(table_name, location_cache_, rpc_client_, conf_); + return std::move(table); +} + +void Client::Close() { + if (is_closed_) return; + + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; } } // namespace hbase diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 0ba1276..ad0ea85 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -27,11 +27,18 @@ #include #include +#include "core/configuration.h" +#include "core/hbase_configuration_loader.h" #include "core/location-cache.h" +#include "connection/rpc-client.h" +#include "core/table.h" +#include "serde/table-name.h" #include "if/Cell.pb.h" -namespace hbase { +using hbase::pb::TableName; +namespace hbase { +class Table; /** * Client. * @@ -42,16 +49,31 @@ namespace hbase { class Client { public: /** - * Create a new client. + * @brief Create a new client. * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ - explicit Client(std::string quorum_spec); + explicit Client(const std::string &zk_quorum); + Client(const std::string &zk_quorum, + const std::shared_ptr> &conf); ~Client(); + /** + * @brief Retrieve a Table implementation for accessing a table. + * @param - table_name + */ + std::unique_ptr Table(const TableName &table_name); + + /** + * @brief Close the Client connection. + */ + void Close(); private: std::shared_ptr cpu_executor_; std::shared_ptr io_executor_; - LocationCache location_cache_; + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_ = std::make_shared(); + std::shared_ptr> conf_; + bool is_closed_ = false; }; } // namespace hbase diff --git a/hbase-native-client/core/protobuf_request_builder-test.cc b/hbase-native-client/core/protobuf_request_builder-test.cc new file mode 100644 index 0000000..b103edb --- /dev/null +++ b/hbase-native-client/core/protobuf_request_builder-test.cc @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/protobuf_request_builder.h" + +#include +#include +#include "connection/request.h" +#include "core/get.h" +#include "core/scan.h" + +using hbase::Get; +using hbase::Scan; + +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::pb::ScanRequest; + +TEST(ProtobufRequestBuilder, Get) { + std::string row_str = "row-test"; + Get get(row_str); + get.AddFamily("family-1"); + get.AddFamily("family-2"); + get.AddFamily("family-3"); + get.AddColumn("family-2", "qualifier-1"); + get.AddColumn("family-2", "qualifier-2"); + get.AddColumn("family-2", "qualifier-3"); + get.SetCacheBlocks(false); + get.SetConsistency(hbase::pb::Consistency::TIMELINE); + get.SetMaxVersions(2); + get.SetTimeRange(10000, 20000); + std::string region_name("RegionName"); + + auto req = hbase::ProtobufRequestBuilder::BuildGetRequest(get, region_name); + auto msg = std::static_pointer_cast(req->req_msg()); + + // Tests whether the PB object is properly set or not. + ASSERT_TRUE(msg->has_region()); + ASSERT_TRUE(msg->region().has_value()); + EXPECT_EQ(msg->region().value(), region_name); + + ASSERT_TRUE(msg->has_get()); + EXPECT_EQ(msg->get().row(), row_str); + EXPECT_FALSE(msg->get().cache_blocks()); + EXPECT_EQ(msg->get().consistency(), hbase::pb::Consistency::TIMELINE); + EXPECT_EQ(msg->get().max_versions(), 2); + EXPECT_EQ(msg->get().column_size(), 3); + for (int i = 0; i < msg->get().column_size(); ++i) { + EXPECT_EQ(msg->get().column(i).family(), "family-" + std::to_string(i + 1)); + for (int j = 0; j < msg->get().column(i).qualifier_size(); ++j) { + EXPECT_EQ(msg->get().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); + } + } +} + +TEST(ProtobufRequestBuilder, Scan) { + std::string start_row("start-row"); + std::string stop_row("stop-row"); + hbase::Scan scan; + scan.AddFamily("family-1"); + scan.AddFamily("family-2"); + scan.AddFamily("family-3"); + scan.AddColumn("family-2", "qualifier-1"); + scan.AddColumn("family-2", "qualifier-2"); + scan.AddColumn("family-2", "qualifier-3"); + scan.SetReversed(true); + scan.SetStartRow(start_row); + scan.SetStopRow(stop_row); + scan.SetSmall(true); + scan.SetCaching(3); + scan.SetConsistency(hbase::pb::Consistency::TIMELINE); + scan.SetCacheBlocks(true); + scan.SetAllowPartialResults(true); + scan.SetLoadColumnFamiliesOnDemand(true); + scan.SetMaxVersions(5); + scan.SetTimeRange(10000, 20000); + std::string region_name("RegionName"); + + auto req = hbase::ProtobufRequestBuilder::BuildScanRequest(scan, region_name); + auto msg = std::static_pointer_cast(req->req_msg()); + + // Tests whether the PB object is properly set or not. + ASSERT_TRUE(msg->has_region()); + ASSERT_TRUE(msg->region().has_value()); + EXPECT_EQ(msg->region().value(), region_name); + + ASSERT_TRUE(msg->has_scan()); + EXPECT_TRUE(msg->scan().reversed()); + EXPECT_EQ(msg->scan().start_row(), start_row); + EXPECT_EQ(msg->scan().stop_row(), stop_row); + EXPECT_TRUE(msg->scan().small()); + EXPECT_EQ(msg->scan().caching(), 3); + EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE); + EXPECT_TRUE(msg->scan().cache_blocks()); + EXPECT_TRUE(msg->scan().allow_partial_results()); + EXPECT_TRUE(msg->scan().load_column_families_on_demand()); + EXPECT_EQ(msg->scan().max_versions(), 5); + EXPECT_EQ(msg->scan().max_result_size(), std::numeric_limits::max()); + + EXPECT_EQ(msg->scan().column_size(), 3); + for (int i = 0; i < msg->scan().column_size(); ++i) { + EXPECT_EQ(msg->scan().column(i).family(), "family-" + std::to_string(i + 1)); + for (int j = 0; j < msg->scan().column(i).qualifier_size(); ++j) { + EXPECT_EQ(msg->scan().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); + } + } + ASSERT_TRUE(msg->client_handles_partials()); + ASSERT_TRUE(msg->client_handles_heartbeats()); + ASSERT_FALSE(msg->track_scan_metrics()); +} diff --git a/hbase-native-client/core/protobuf_request_builder.cc b/hbase-native-client/core/protobuf_request_builder.cc new file mode 100644 index 0000000..3610325 --- /dev/null +++ b/hbase-native-client/core/protobuf_request_builder.cc @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/protobuf_request_builder.h" + +#include + +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::Request; +using hbase::pb::ScanRequest; + +namespace hbase { + +ProtobufRequestBuilder::~ProtobufRequestBuilder() {} + +ProtobufRequestBuilder::ProtobufRequestBuilder() {} + +void ProtobufRequestBuilder::SetRegion(const std::string ®ion_name, + RegionSpecifier *region_specifier) { + region_specifier->set_type( + RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); + region_specifier->set_value(region_name); +} + +std::unique_ptr ProtobufRequestBuilder::BuildGetRequest(const Get &get, + const std::string ®ion_name) { + std::unique_ptr pb_req = std::move(Request::get()); + + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + ProtobufRequestBuilder::SetRegion(region_name, pb_msg->mutable_region()); + + auto pb_get = pb_msg->mutable_get(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.Row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + return std::move(pb_req); +} + +std::unique_ptr ProtobufRequestBuilder::BuildScanRequest(const Scan &scan, + const std::string ®ion_name) { + std::unique_ptr pb_req = std::move(Request::scan()); + + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + ProtobufRequestBuilder::SetRegion(region_name, pb_msg->mutable_region()); + + auto pb_scan = pb_msg->mutable_scan(); + pb_scan->set_max_versions(scan.MaxVersions()); + pb_scan->set_cache_blocks(scan.CacheBlocks()); + pb_scan->set_reversed(scan.IsReversed()); + pb_scan->set_small(scan.IsSmall()); + pb_scan->set_caching(scan.Caching()); + pb_scan->set_start_row(scan.StartRow()); + pb_scan->set_stop_row(scan.StopRow()); + pb_scan->set_consistency(scan.Consistency()); + pb_scan->set_max_result_size(scan.MaxResultSize()); + pb_scan->set_allow_partial_results(scan.AllowPartialResults()); + pb_scan->set_load_column_families_on_demand(scan.LoadColumnFamiliesOnDemand()); + + if (scan.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_scan->mutable_time_range(); + pb_time_range->set_from(scan.Timerange().MinTimeStamp()); + pb_time_range->set_to(scan.Timerange().MaxTimeStamp()); + } + + if (scan.HasFamilies()) { + for (const auto &family : scan.Family()) { + auto column = pb_scan->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + // TODO hardcoding below calues as of now. Should they be overridden? + pb_msg->set_client_handles_partials(true); + pb_msg->set_client_handles_heartbeats(true); + pb_msg->set_track_scan_metrics(false); + + return std::move(pb_req); +} +} /* namespace hbase */ diff --git a/hbase-native-client/core/protobuf_request_builder.h b/hbase-native-client/core/protobuf_request_builder.h new file mode 100644 index 0000000..7b3ae84 --- /dev/null +++ b/hbase-native-client/core/protobuf_request_builder.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +#include "connection/request.h" +#include "core/get.h" +#include "core/scan.h" + +using hbase::pb::RegionSpecifier; + +namespace hbase { + +class ProtobufRequestBuilder { + public: + ~ProtobufRequestBuilder(); + + /** + * @brief Returns a Request object comprising of PB GetRequest created using + * passed 'get' + * @param get - Get object used for creating GetRequest + * @param region_name - table region + */ + static std::unique_ptr BuildGetRequest(const Get &get, const std::string ®ion_name); + + /** + * @brief Returns a Request object comprising of PB ScanRequest created using + * passed 'scan' + * @param scan - Scan object used for creating ScanRequest + * @param region_name - table region + */ + static std::unique_ptr BuildScanRequest(const Scan &scan, + const std::string ®ion_name); + + private: + // Constructor not rquired. We will have all static methods to create PB + // requests + ProtobufRequestBuilder(); + + /** + * @brief fills region_specifier with region values. + * @param region_name - table region + * @param region_specifier - RegionSpecifier to be filled and passed in PB + * Request. + */ + static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc new file mode 100644 index 0000000..05d10a5 --- /dev/null +++ b/hbase-native-client/core/table.cc @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/table.h" + +#include +#include + +#include +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/cell.h" +#include "core/configuration.h" +#include "core/location-cache.h" +#include "core/protobuf_request_builder.h" +#include "security/user.h" +#include "serde/server-name.h" + +using folly::Future; +using hbase::pb::GetResponse; +using hbase::pb::TableName; +using hbase::security::User; +using std::chrono::milliseconds; + +namespace hbase { + +Table::Table(const TableName &table_name, + const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client, + const std::shared_ptr> &conf) + : table_name_(std::make_shared(table_name)), + location_cache_(location_cache), + rpc_client_(rpc_client), + conf_(conf) { + client_retries_ = (conf_) ? (conf_->value()).GetInt("hbase.client.retries", client_retries_) : 5; +} + +Table::~Table() {} + +Table::Table(const Table &table) { + table_name_ = table.table_name_; + location_cache_ = table.location_cache_; + rpc_client_ = table.rpc_client_; + conf_ = table.conf_; + is_closed_ = false; + client_retries_ = table.client_retries_; +} + +Table &Table::operator=(const Table &table) { + table_name_ = table.table_name_; + location_cache_ = table.location_cache_; + rpc_client_ = table.rpc_client_; + conf_ = table.conf_; + is_closed_ = false; + client_retries_ = table.client_retries_; + return *this; +} + +std::unique_ptr Table::Get(const hbase::Get &get) { + try { + auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000)); + auto req = hbase::ProtobufRequestBuilder::BuildGetRequest(get, loc->region_name()); + auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil + + Future f = + rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(req), user, "ClientService"); + Response resp = f.get(); + std::shared_ptr get_resp = + std::static_pointer_cast(resp.resp_msg()); + + std::vector> vcells; + for (auto cell : get_resp->result().cell()) { + std::shared_ptr pcell = + std::make_shared(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), + cell.value(), static_cast(cell.cell_type())); + vcells.push_back(pcell); + } + + std::unique_ptr result = + std::make_unique(vcells, get_resp->result().exists(), get_resp->result().stale(), + get_resp->result().partial()); + return std::move(result); + } + catch (const std::runtime_error &rex) { + LOG(ERROR) << "Caught exception while performing Table::Get() :-" << rex.what(); + return nullptr; + } +} + +void Table::Close() { + if (is_closed_) return; + // client_->Close(); + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; +} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h new file mode 100644 index 0000000..3201450 --- /dev/null +++ b/hbase-native-client/core/table.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/location-cache.h" +#include "core/result.h" +#include "serde/table-name.h" + +using hbase::pb::TableName; + +namespace hbase { +class Client; + +class Table { + public: + /** + * Constructors + */ + Table(const TableName &table_name, const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client, + const std::shared_ptr> &conf); + Table(const Table &table); + Table &operator=(const Table &table); + ~Table(); + + /** + * @brief - Returns a Result object for the constructed Get. + * @param - get Get object to perform HBase Get operation. + */ + std::unique_ptr Get(const hbase::Get &get); + + /** + * @brief - Close the client connection. + */ + void Close(); + + private: + std::shared_ptr table_name_; + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_; + std::shared_ptr> conf_; + bool is_closed_ = false; + // default 5 retries. over-ridden in constructor. + int client_retries_ = 5; +}; +} /* namespace hbase */ -- 1.8.3.1