From ea378f880bc3d251b87b21e803e3993976aaec51 Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Wed, 11 Jan 2017 22:57:15 +1100 Subject: [PATCH 2/2] Source for Table and Client implementation. This patch applies on top of RequestConverter and ResponseConverter patch diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c22cc89..19536d5 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -30,6 +30,7 @@ cxx_library( "rpc-connection.h", "response.h", "service.h", + "rpc-client.h", ], srcs=[ "client-dispatcher.cc", @@ -38,6 +39,7 @@ cxx_library( "connection-pool.cc", "pipeline.cc", "request.cc", + "rpc-client.cc", ], deps=[ "//if:if", diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index b7db41a..0d1bc93 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -35,6 +35,7 @@ cxx_library( "result.h", "request_converter.h", "response_converter.h", + "table.h", ], srcs=[ "cell.cc", @@ -49,6 +50,7 @@ cxx_library( "result.cc", "request_converter.cc", "response_converter.cc", + "table.cc", ], deps=[ "//connection:connection", @@ -107,6 +109,16 @@ cxx_test( "//if:if", ], run_test_separately=True,) +cxx_test( + name="client-test", + srcs=["client-test.cc",], + deps=[ + ":core", + "//if:if", + "//serde:serde", + "//test-util:test-util", + ], + run_test_separately=True,) cxx_binary( name="simple-client", srcs=["simple-client.cc",], diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc new file mode 100644 index 0000000..2512cb5 --- /dev/null +++ b/hbase-native-client/core/client-test.cc @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/hbase_configuration_loader.h" +#include "core/result.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" + +class ClientTest { + public: + const static std::string kDefHBaseConfPath; + const static std::string kHBaseConfPath; + + const static std::string kHBaseDefaultXml; + const static std::string kHBaseSiteXml; + + const static std::string kHBaseXmlData; + + static void WriteDataToFile(const std::string &file, const std::string &xml_data) { + std::ofstream hbase_conf; + hbase_conf.open(file.c_str()); + hbase_conf << xml_data; + hbase_conf.close(); + } + + static void CreateHBaseConf(const std::string &dir, const std::string &file, + const std::string xml_data) { + // Directory will be created if not present + if (!boost::filesystem::exists(dir)) { + boost::filesystem::create_directories(dir); + } + // Remove temp file always + boost::filesystem::remove((dir + file).c_str()); + WriteDataToFile((dir + file), xml_data); + } + + static void CreateHBaseConfWithEnv() { + // Creating Empty Config Files so that we dont get a Configuration exception @Client + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData); + setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); + } +}; + +const std::string ClientTest::kDefHBaseConfPath("./build/test-data/hbase-configuration-test/conf/"); +const std::string ClientTest::kHBaseConfPath( + "./build/test-data/hbase-configuration-test/custom-conf/"); + +const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml"); +const std::string ClientTest::kHBaseSiteXml("hbase-site.xml"); + +const std::string ClientTest::kHBaseXmlData( + "\n\n\n\n\n"); + +TEST(Client, EmptyConfigurationPassedToClient) { + hbase::optional conf; + // Create a client + ASSERT_ANY_THROW(hbase::Client client(conf)); +} + +TEST(Client, ConfigurationPassedToClient) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + // Create a client + hbase::Client client(conf); + client.Close(); +} + +TEST(Client, DefaultConfiguration) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Create Configuration + hbase::Client client; + client.Close(); +} + +TEST(Client, Get) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform the Get + auto result = table->Get(get); + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + + table->Close(); + client.Close(); +} + +TEST(Client, GetForNonExistentTable) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t_not_exists"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform the Get + ASSERT_ANY_THROW(table->Get(get)) << "Table does not exist. We should get an exception"; + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + table->Close(); + client.Close(); +} + +TEST(Client, GetForNonExistentRow) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + ClientTest::CreateHBaseConfWithEnv(); + + // Using TestUtil to populate test data + hbase::TestUtil *test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't_exists', 'd'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t_exists"); + auto row = "row_not_exists"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + + // Create a client + hbase::Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform the Get + auto result = table->Get(get); + ASSERT_TRUE(result->IsEmpty()) << "Result should be empty."; + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + // So we are stopping the connection. + // We can remove this once we have fixed the folly part + delete test_util; + + table->Close(); + client.Close(); +} diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 0389b24..8c220c7 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -20,27 +20,53 @@ #include "core/client.h" #include - -#include -#include - -using namespace folly; -using namespace std; -using namespace hbase::pb; +#include +#include namespace hbase { -Client::Client(std::string zk_quorum) - : cpu_executor_(std::make_shared(4)), - io_executor_(std::make_shared( - sysconf(_SC_NPROCESSORS_ONLN))), - location_cache_(zk_quorum, cpu_executor_, io_executor_) {} +Client::Client() { + HBaseConfigurationLoader loader; + auto conf = loader.LoadDefaultResources(); + if (!conf) { + LOG(ERROR) << "Unable to create default Configuration object. Either hbase-default.xml or " + "hbase-site.xml is absent in the search path or problems in XML parsing"; + throw std::runtime_error("Configuration object not present."); + } + conf_ = std::make_shared(conf.value()); + auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); + location_cache_ = std::make_shared(zk_quorum, cpu_executor_, io_executor_); +} + +Client::Client(const hbase::optional &conf) { + if (!conf) { + LOG(ERROR) << "Empty Configuration object passed to Client."; + throw std::runtime_error("Configuration object not present."); + } + conf_ = std::make_shared(conf.value()); + auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_); + location_cache_ = std::make_shared(zk_quorum, cpu_executor_, io_executor_); +} // We can't have the threads continue running after everything is done // that leads to an error. Client::~Client() { cpu_executor_->stop(); io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); +} + +std::unique_ptr Client::Table(const TableName &table_name) { + return std::make_unique(table_name, location_cache_, rpc_client_, conf_); +} + +void Client::Close() { + if (is_closed_) return; + + cpu_executor_->stop(); + io_executor_->stop(); + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; } } // namespace hbase diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 0ba1276..de1fc60 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -27,11 +27,18 @@ #include #include +#include "core/configuration.h" +#include "core/hbase_configuration_loader.h" #include "core/location-cache.h" +#include "connection/rpc-client.h" +#include "core/table.h" +#include "serde/table-name.h" #include "if/Cell.pb.h" -namespace hbase { +using hbase::pb::TableName; +namespace hbase { +class Table; /** * Client. * @@ -42,16 +49,34 @@ namespace hbase { class Client { public: /** - * Create a new client. + * @brief Create a new client. * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ - explicit Client(std::string quorum_spec); + Client(); + explicit Client(const hbase::optional &conf); ~Client(); + /** + * @brief Retrieve a Table implementation for accessing a table. + * @param - table_name + */ + std::unique_ptr Table(const TableName &table_name); + + /** + * @brief Close the Client connection. + */ + void Close(); private: - std::shared_ptr cpu_executor_; - std::shared_ptr io_executor_; - LocationCache location_cache_; + const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; + const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; + std::shared_ptr cpu_executor_ = + std::make_shared(4); + std::shared_ptr io_executor_ = + std::make_shared(sysconf(_SC_NPROCESSORS_ONLN)); + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_ = std::make_shared(); + std::shared_ptr conf_; + bool is_closed_ = false; }; } // namespace hbase diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc new file mode 100644 index 0000000..58125f9 --- /dev/null +++ b/hbase-native-client/core/table.cc @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/table.h" + +#include +#include +#include +#include +#include + +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "if/Client.pb.h" +#include "security/user.h" +#include "serde/server-name.h" + +using folly::Future; +using hbase::pb::TableName; +using hbase::security::User; +using std::chrono::milliseconds; + +namespace hbase { + +Table::Table(const TableName &table_name, + const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client, + const std::shared_ptr &conf) + : table_name_(std::make_shared(table_name)), + location_cache_(location_cache), + rpc_client_(rpc_client), + conf_(conf) { + client_retries_ = (conf_) ? conf_->GetInt("hbase.client.retries", client_retries_) : 5; +} + +Table::~Table() {} + +std::unique_ptr Table::Get(const hbase::Get &get) { + auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000)); + auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name()); + auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil + + Future f = + rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(req), user, "ClientService"); + auto resp = f.get(); + + return hbase::ResponseConverter::FromGetResponse(resp); +} + +void Table::Close() { + if (is_closed_) return; + + if (rpc_client_.get()) rpc_client_->Close(); + is_closed_ = true; +} + +} /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h new file mode 100644 index 0000000..0e98cd2 --- /dev/null +++ b/hbase-native-client/core/table.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "connection/rpc-client.h" +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/location-cache.h" +#include "core/result.h" +#include "serde/table-name.h" + +using hbase::pb::TableName; + +namespace hbase { +class Client; + +class Table { + public: + /** + * Constructors + */ + Table(const TableName &table_name, const std::shared_ptr &location_cache, + const std::shared_ptr &rpc_client, + const std::shared_ptr &conf); + ~Table(); + + /** + * @brief - Returns a Result object for the constructed Get. + * @param - get Get object to perform HBase Get operation. + */ + std::unique_ptr Get(const hbase::Get &get); + + /** + * @brief - Close the client connection. + */ + void Close(); + + private: + std::shared_ptr table_name_; + std::shared_ptr location_cache_; + std::shared_ptr rpc_client_; + std::shared_ptr conf_; + bool is_closed_ = false; + // default 5 retries. over-ridden in constructor. + int client_retries_ = 5; +}; +} /* namespace hbase */ -- 1.8.3.1