From 0aa988a36714e5335b15742d98c4c694e4daa0a4 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Mon, 9 Jan 2017 17:39:30 +1100 Subject: [PATCH] 1) Removed extra parameter for zookeeper quorum and unneceesary commented code. 2) Addressed issue to return Table and Result as values instead of pointers. 3) Fixed TimeRange allocattion in ProtobufRequestBuilder class. diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c22cc89..19536d5 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -30,6 +30,7 @@ cxx_library( "rpc-connection.h", "response.h", "service.h", + "rpc-client.h", ], srcs=[ "client-dispatcher.cc", @@ -38,6 +39,7 @@ cxx_library( "connection-pool.cc", "pipeline.cc", "request.cc", + "rpc-client.cc", ], deps=[ "//if:if", diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index a5ea5c0..1bb4a02 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -33,6 +33,8 @@ cxx_library( "hbase_configuration_loader.h", "scan.h", "result.h", + "protobuf_request_builder.h", + "table.h", ], srcs=[ "cell.cc", @@ -45,6 +47,8 @@ cxx_library( "hbase_configuration_loader.cc", "scan.cc", "result.cc", + "protobuf_request_builder.cc", + "table.cc", ], deps=[ "//connection:connection", @@ -94,6 +98,25 @@ cxx_test( srcs=["result-test.cc",], deps=[":core",], run_test_separately=True,) +cxx_test( + name="protobuf_request_builder-test", + srcs=["protobuf_request_builder-test.cc",], + deps=[":core",], + run_test_separately=True,) +cxx_test( + name="client-test", + srcs=["client-test.cc",], + deps=[ + ":core", + "//connection:connection", + "//if:if", + "//serde:serde", + "//third-party:folly", + "//third-party:wangle", + "//third-party:zookeeper_mt", + "//test-util:test-util", + ], + run_test_separately=True,) cxx_binary( name="simple-client", srcs=["simple-client.cc",], diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc new file mode 100644 index 0000000..199b3d9 --- /dev/null +++ b/hbase-native-client/core/client-test.cc @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/hbase_configuration_loader.h" +#include "core/result.h" +#include "core/table.h" +#include "test-util/test-util.h" +#include "serde/table-name.h" + +class ClientTest { + public: + const static std::string kDefHBaseConfPath; + const static std::string kHBaseConfPath; + + const static std::string kHBaseDefaultXml; + const static std::string kHBaseSiteXml; + + const static std::string kHBaseXmlData; + + static void WriteDataToFile(const std::string &file, const std::string &xml_data) { + std::ofstream hbase_conf; + hbase_conf.open(file.c_str()); + hbase_conf << xml_data; + hbase_conf.close(); + } + + static void CreateHBaseConf(const std::string &dir, const std::string &file, + const std::string xml_data) { + // Directory will be created if not present + if (!boost::filesystem::exists(dir)) { + boost::filesystem::create_directories(dir); + } + // Remove temp file always + boost::filesystem::remove((dir + file).c_str()); + WriteDataToFile((dir + file), xml_data); + } + + static void CreateHBaseConfWithEnv() { + // Creating Empty Config Files so that we dont get a Configuration exception @Client + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData); + setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); + } +}; + +const std::string ClientTest::kDefHBaseConfPath("./build/test-data/hbase-configuration-test/conf/"); +const std::string ClientTest::kHBaseConfPath( + "./build/test-data/hbase-configuration-test/custom-conf/"); + +const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml"); +const std::string ClientTest::kHBaseSiteXml("hbase-site.xml"); + +const std::string ClientTest::kHBaseXmlData( + "\n\n\n\n\n"); + +TEST(Client, EmptyConfigurationPassedToClient) { + hbase::optional conf; + // Create a client + ASSERT_ANY_THROW(hbase::Client client(conf)); +} + +TEST(Client, ConfigurationPassedToClient) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + // Create a client + hbase::Client client(conf); + client.Close(); +} + +TEST(Client, DefaultConfiguration) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Create Configuration + hbase::Client client; + client.Close(); +} + +TEST(Client, Get) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + // ASSERT_TRUE(table != nullptr) << "Unable to get connection to Table."; + + // Perform the Get + hbase::Result result = table.Get(get); + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result.Row()); + EXPECT_EQ("value2", *(result.Value("d", "2"))); + EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); + + table.Close(); + client.Close(); +} + +TEST(Client, GetForNonExistentTable) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t_not_exists"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + + // Perform the Get + ASSERT_ANY_THROW(table.Get(get)) << "Table does not exist. We should get an exception"; + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + table.Close(); + client.Close(); +} + +TEST(Client, GetForNonExistentRow) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't_exists', 'd'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t_exists"); + auto row = "row_not_exists"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + + // Perform the Get + hbase::Result result = table.Get(get); + ASSERT_TRUE(result.IsEmpty()) << "Result should be empty."; + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + table.Close(); + client.Close(); +} diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 0389b24..2a93c55 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -20,27 +20,54 @@ #include "core/client.h" #include - -#include -#include - -using namespace folly; -using namespace std; -using namespace hbase::pb; +#include +#include namespace hbase { -Client::Client(std::string zk_quorum) - : cpu_executor_(std::make_shared(4)), - io_executor_(std::make_shared( - sysconf(_SC_NPROCESSORS_ONLN))), - location_cache_(zk_quorum, cpu_executor_, io_executor_) {} +Client::Client() { + HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + if (!conf) { + LOG(ERROR) << "Unable to create default Configuration object. Either hbase-default.xml or " + "hbase-site.xml is absent in the search path or problems in XML parsing"; + throw std::runtime_error("Configuration object not present."); + } + conf_ = std::make_shared>(conf); + auto zk_quorum = conf_->value().Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); + location_cache_ = std::make_shared(zk_quorum, cpu_executor_, io_executor_); +} + +Client::Client(const hbase::optional &conf) { + if (!conf) { + LOG(ERROR) << "Empty Configuration object passed to Client."; + throw std::runtime_error("Configuration object not present."); + } + conf_ = std::make_shared>(conf); + auto zk_quorum = conf_->value().Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); + location_cache_ = std::make_shared(zk_quorum, cpu_executor_, io_executor_); +} // We can't have the threads continue running after everything is done // that leads to an error. Client::~Client() { cpu_executor_->stop(); io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); +} + +hbase::Table Client::Table(const TableName &table_name) { + hbase::Table table(table_name, location_cache_, rpc_client_, conf_); + return table; +} + +void Client::Close() { + if (is_closed_) return; + + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; } } // namespace hbase diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 0ba1276..fc15ab1 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -27,11 +27,18 @@ #include #include +#include "core/configuration.h" +#include "core/hbase_configuration_loader.h" #include "core/location-cache.h" +#include "connection/rpc-client.h" +#include "core/table.h" +#include "serde/table-name.h" #include "if/Cell.pb.h" -namespace hbase { +using hbase::pb::TableName; +namespace hbase { +class Table; /** * Client. * @@ -42,16 +49,34 @@ namespace hbase { class Client { public: /** - * Create a new client. + * @brief Create a new client. * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ - explicit Client(std::string quorum_spec); + Client(); + explicit Client(const hbase::optional &conf); ~Client(); + /** + * @brief Retrieve a Table implementation for accessing a table. + * @param - table_name + */ + hbase::Table Table(const TableName &table_name); + + /** + * @brief Close the Client connection. + */ + void Close(); private: - std::shared_ptr cpu_executor_; - std::shared_ptr io_executor_; - LocationCache location_cache_; + const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; + const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; + std::shared_ptr cpu_executor_ = + std::make_shared(4); + std::shared_ptr io_executor_ = + std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_ = std::make_shared(); + std::shared_ptr> conf_; + bool is_closed_ = false; }; } // namespace hbase diff --git a/hbase-native-client/core/protobuf_request_builder-test.cc b/hbase-native-client/core/protobuf_request_builder-test.cc new file mode 100644 index 0000000..b103edb --- /dev/null +++ b/hbase-native-client/core/protobuf_request_builder-test.cc @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/protobuf_request_builder.h" + +#include +#include +#include "connection/request.h" +#include "core/get.h" +#include "core/scan.h" + +using hbase::Get; +using hbase::Scan; + +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::pb::ScanRequest; + +TEST(ProtobufRequestBuilder, Get) { + std::string row_str = "row-test"; + Get get(row_str); + get.AddFamily("family-1"); + get.AddFamily("family-2"); + get.AddFamily("family-3"); + get.AddColumn("family-2", "qualifier-1"); + get.AddColumn("family-2", "qualifier-2"); + get.AddColumn("family-2", "qualifier-3"); + get.SetCacheBlocks(false); + get.SetConsistency(hbase::pb::Consistency::TIMELINE); + get.SetMaxVersions(2); + get.SetTimeRange(10000, 20000); + std::string region_name("RegionName"); + + auto req = hbase::ProtobufRequestBuilder::BuildGetRequest(get, region_name); + auto msg = std::static_pointer_cast(req->req_msg()); + + // Tests whether the PB object is properly set or not. + ASSERT_TRUE(msg->has_region()); + ASSERT_TRUE(msg->region().has_value()); + EXPECT_EQ(msg->region().value(), region_name); + + ASSERT_TRUE(msg->has_get()); + EXPECT_EQ(msg->get().row(), row_str); + EXPECT_FALSE(msg->get().cache_blocks()); + EXPECT_EQ(msg->get().consistency(), hbase::pb::Consistency::TIMELINE); + EXPECT_EQ(msg->get().max_versions(), 2); + EXPECT_EQ(msg->get().column_size(), 3); + for (int i = 0; i < msg->get().column_size(); ++i) { + EXPECT_EQ(msg->get().column(i).family(), "family-" + std::to_string(i + 1)); + for (int j = 0; j < msg->get().column(i).qualifier_size(); ++j) { + EXPECT_EQ(msg->get().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); + } + } +} + +TEST(ProtobufRequestBuilder, Scan) { + std::string start_row("start-row"); + std::string stop_row("stop-row"); + hbase::Scan scan; + scan.AddFamily("family-1"); + scan.AddFamily("family-2"); + scan.AddFamily("family-3"); + scan.AddColumn("family-2", "qualifier-1"); + scan.AddColumn("family-2", "qualifier-2"); + scan.AddColumn("family-2", "qualifier-3"); + scan.SetReversed(true); + scan.SetStartRow(start_row); + scan.SetStopRow(stop_row); + scan.SetSmall(true); + scan.SetCaching(3); + scan.SetConsistency(hbase::pb::Consistency::TIMELINE); + scan.SetCacheBlocks(true); + scan.SetAllowPartialResults(true); + scan.SetLoadColumnFamiliesOnDemand(true); + scan.SetMaxVersions(5); + scan.SetTimeRange(10000, 20000); + std::string region_name("RegionName"); + + auto req = hbase::ProtobufRequestBuilder::BuildScanRequest(scan, region_name); + auto msg = std::static_pointer_cast(req->req_msg()); + + // Tests whether the PB object is properly set or not. + ASSERT_TRUE(msg->has_region()); + ASSERT_TRUE(msg->region().has_value()); + EXPECT_EQ(msg->region().value(), region_name); + + ASSERT_TRUE(msg->has_scan()); + EXPECT_TRUE(msg->scan().reversed()); + EXPECT_EQ(msg->scan().start_row(), start_row); + EXPECT_EQ(msg->scan().stop_row(), stop_row); + EXPECT_TRUE(msg->scan().small()); + EXPECT_EQ(msg->scan().caching(), 3); + EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE); + EXPECT_TRUE(msg->scan().cache_blocks()); + EXPECT_TRUE(msg->scan().allow_partial_results()); + EXPECT_TRUE(msg->scan().load_column_families_on_demand()); + EXPECT_EQ(msg->scan().max_versions(), 5); + EXPECT_EQ(msg->scan().max_result_size(), std::numeric_limits::max()); + + EXPECT_EQ(msg->scan().column_size(), 3); + for (int i = 0; i < msg->scan().column_size(); ++i) { + EXPECT_EQ(msg->scan().column(i).family(), "family-" + std::to_string(i + 1)); + for (int j = 0; j < msg->scan().column(i).qualifier_size(); ++j) { + EXPECT_EQ(msg->scan().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); + } + } + ASSERT_TRUE(msg->client_handles_partials()); + ASSERT_TRUE(msg->client_handles_heartbeats()); + ASSERT_FALSE(msg->track_scan_metrics()); +} diff --git a/hbase-native-client/core/protobuf_request_builder.cc b/hbase-native-client/core/protobuf_request_builder.cc new file mode 100644 index 0000000..b529942 --- /dev/null +++ b/hbase-native-client/core/protobuf_request_builder.cc @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/protobuf_request_builder.h" + +#include + +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::Request; +using hbase::pb::ScanRequest; + +namespace hbase { + +ProtobufRequestBuilder::~ProtobufRequestBuilder() {} + +ProtobufRequestBuilder::ProtobufRequestBuilder() {} + +void ProtobufRequestBuilder::SetRegion(const std::string ®ion_name, + RegionSpecifier *region_specifier) { + region_specifier->set_type( + RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); + region_specifier->set_value(region_name); +} + +std::unique_ptr ProtobufRequestBuilder::BuildGetRequest(const Get &get, + const std::string ®ion_name) { + std::unique_ptr pb_req = std::move(Request::get()); + + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + ProtobufRequestBuilder::SetRegion(region_name, pb_msg->mutable_region()); + + auto pb_get = pb_msg->mutable_get(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.Row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + return pb_req; +} + +std::unique_ptr ProtobufRequestBuilder::BuildScanRequest(const Scan &scan, + const std::string ®ion_name) { + std::unique_ptr pb_req = std::move(Request::scan()); + + auto pb_msg = std::static_pointer_cast(pb_req->req_msg()); + + ProtobufRequestBuilder::SetRegion(region_name, pb_msg->mutable_region()); + + auto pb_scan = pb_msg->mutable_scan(); + pb_scan->set_max_versions(scan.MaxVersions()); + pb_scan->set_cache_blocks(scan.CacheBlocks()); + pb_scan->set_reversed(scan.IsReversed()); + pb_scan->set_small(scan.IsSmall()); + pb_scan->set_caching(scan.Caching()); + pb_scan->set_start_row(scan.StartRow()); + pb_scan->set_stop_row(scan.StopRow()); + pb_scan->set_consistency(scan.Consistency()); + pb_scan->set_max_result_size(scan.MaxResultSize()); + pb_scan->set_allow_partial_results(scan.AllowPartialResults()); + pb_scan->set_load_column_families_on_demand(scan.LoadColumnFamiliesOnDemand()); + + if (!scan.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_scan->mutable_time_range(); + pb_time_range->set_from(scan.Timerange().MinTimeStamp()); + pb_time_range->set_to(scan.Timerange().MaxTimeStamp()); + } + + if (scan.HasFamilies()) { + for (const auto &family : scan.Family()) { + auto column = pb_scan->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + // TODO hardcoding below calues as of now. Should they be overridden? + pb_msg->set_client_handles_partials(true); + pb_msg->set_client_handles_heartbeats(true); + pb_msg->set_track_scan_metrics(false); + + return pb_req; +} +} /* namespace hbase */ diff --git a/hbase-native-client/core/protobuf_request_builder.h b/hbase-native-client/core/protobuf_request_builder.h new file mode 100644 index 0000000..7b3ae84 --- /dev/null +++ b/hbase-native-client/core/protobuf_request_builder.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +#include "connection/request.h" +#include "core/get.h" +#include "core/scan.h" + +using hbase::pb::RegionSpecifier; + +namespace hbase { + +class ProtobufRequestBuilder { + public: + ~ProtobufRequestBuilder(); + + /** + * @brief Returns a Request object comprising of PB GetRequest created using + * passed 'get' + * @param get - Get object used for creating GetRequest + * @param region_name - table region + */ + static std::unique_ptr BuildGetRequest(const Get &get, const std::string ®ion_name); + + /** + * @brief Returns a Request object comprising of PB ScanRequest created using + * passed 'scan' + * @param scan - Scan object used for creating ScanRequest + * @param region_name - table region + */ + static std::unique_ptr BuildScanRequest(const Scan &scan, + const std::string ®ion_name); + + private: + // Constructor not rquired. We will have all static methods to create PB + // requests + ProtobufRequestBuilder(); + + /** + * @brief fills region_specifier with region values. + * @param region_name - table region + * @param region_specifier - RegionSpecifier to be filled and passed in PB + * Request. + */ + static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc new file mode 100644 index 0000000..d1773b7 --- /dev/null +++ b/hbase-native-client/core/table.cc @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/table.h" + +#include + +#include +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/cell.h" +#include "core/configuration.h" +#include "core/location-cache.h" +#include "core/protobuf_request_builder.h" +#include "security/user.h" +#include "serde/server-name.h" + +using folly::Future; +using hbase::pb::GetResponse; +using hbase::pb::TableName; +using hbase::security::User; +using std::chrono::milliseconds; + +namespace hbase { + +Table::Table(const TableName &table_name, + const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client, + const std::shared_ptr> &conf) + : table_name_(std::make_shared(table_name)), + location_cache_(location_cache), + rpc_client_(rpc_client), + conf_(conf) { + client_retries_ = (conf_) ? (conf_->value()).GetInt("hbase.client.retries", client_retries_) : 5; +} + +Table::~Table() {} + +hbase::Result Table::Get(const hbase::Get &get) { + auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000)); + auto req = hbase::ProtobufRequestBuilder::BuildGetRequest(get, loc->region_name()); + auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil + + Future f = + rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(req), user, "ClientService"); + Response resp = f.get(); + std::shared_ptr get_resp = + std::static_pointer_cast(resp.resp_msg()); + + std::vector> vcells; + for (auto cell : get_resp->result().cell()) { + std::shared_ptr pcell = + std::make_shared(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), + cell.value(), static_cast(cell.cell_type())); + vcells.push_back(pcell); + } + + hbase::Result result(vcells, get_resp->result().exists(), get_resp->result().stale(), + get_resp->result().partial()); + return result; +} + +void Table::Close() { + if (is_closed_) return; + + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; +} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h new file mode 100644 index 0000000..e950499 --- /dev/null +++ b/hbase-native-client/core/table.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/location-cache.h" +#include "core/result.h" +#include "serde/table-name.h" + +using hbase::pb::TableName; + +namespace hbase { +class Client; + +class Table { + public: + /** + * Constructors + */ + Table(const TableName &table_name, const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client, + const std::shared_ptr> &conf); + ~Table(); + + /** + * @brief - Returns a Result object for the constructed Get. + * @param - get Get object to perform HBase Get operation. + */ + hbase::Result Get(const hbase::Get &get); + + /** + * @brief - Close the client connection. + */ + void Close(); + + private: + std::shared_ptr table_name_; + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_; + std::shared_ptr> conf_; + bool is_closed_ = false; + // default 5 retries. over-ridden in constructor. + int client_retries_ = 5; +}; +} /* namespace hbase */ -- 1.8.3.1