diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 464c010..0398786 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -329,3 +329,7 @@ cxx_binary( "simple-client.cc", ], deps=[":core", "//connection:connection"],) +cxx_binary( + name="load-client", + srcs=["load-client.cc",], + deps=[":core", "//connection:connection"],) diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc new file mode 100644 index 0000000..9095810 --- /dev/null +++ b/hbase-native-client/core/load-client.cc @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include "core/client.h" +#include "core/get.h" +#include "core/put.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "utils/time-util.h" + +using hbase::Client; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Table; +using hbase::pb::TableName; +using hbase::TimeUtil; + +DEFINE_string(table, "t", "What table to do the reads and writes"); +DEFINE_string(row_prefix, "", "row prefix"); +DEFINE_string(families, "d", "comma separated list of column family names"); +DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_uint64(num_rows, 1'000'000, "How many rows to write and read"); +DEFINE_uint64(num_cols, 10000, "How many columns there are in a row"); +DEFINE_int32(sleep_duration, 20, "Duration of sleep in case RetriesExhaustedException is thrown"); +DEFINE_int32(threads, 10, "How many client threads"); + +std::unique_ptr MakePut(const std::string &row) { + auto put = std::make_unique(row); + put->AddColumn("d", "q", row); + return std::move(put); +} + +std::string Row(const std::string &prefix, uint64_t i) { + auto suf = folly::to(i); + return prefix + suf; +} + +static int n_zero = 7; + +std::string PrefixZero(int num) { + std::string str = std::to_string(num); + return std::string(n_zero - str.length(), '0') + str; +} +int main(int argc, char *argv[]) { + google::SetUsageMessage("Client to get a single row from HBase on the comamnd line"); + google::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + google::InstallFailureSignalHandler(); + FLAGS_logtostderr = 1; + FLAGS_stderrthreshold = 1; + + // Configuration + auto conf = std::make_shared(); + conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); + + auto tn = std::make_shared(folly::to(FLAGS_table)); + auto num_puts = FLAGS_num_rows; + + auto client = std::make_unique(*conf); + + // Do the Put requests + auto start_ns = TimeUtil::GetNowNanos(); + + std::vector families; + std::size_t pos = 0, found; + while ((found = FLAGS_families.find_first_of(',', pos)) != std::string::npos) { + families.push_back(FLAGS_families.substr(pos, found - pos)); + pos = found + 1; + } + families.push_back(FLAGS_families.substr(pos)); + + int rows = FLAGS_num_rows / FLAGS_threads; + int cols = FLAGS_num_cols; + int duration = FLAGS_sleep_duration; + std::thread *threads = new std::thread[FLAGS_threads]; + std::string val = "value1"; + for (int i = 0; i < FLAGS_threads; i++) { + threads[i] = std::thread([=, &client, &tn, &families] { + // Get connection to HBase Table + auto table = client->Table(*tn); + + for (int j = 0; j < rows; j++) { + std::string row = PrefixZero(i * rows + j); + for (auto family : families) { + for (int k = 1; k <= cols; k++) { + table->Put(Put{row}.AddColumn(family, std::to_string(k), val)); + } + } + } + table->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + threads[i].join(); + } + + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + + // Do the Get requests + start_ns = TimeUtil::GetNowNanos(); + for (int i = 0; i < FLAGS_threads; i++) { + threads[i] = std::thread([=, &client, &tn, &families] { + // Get connection to HBase Table + auto table1 = client->Table(*tn); + + if ((i % 2) == 0) { + hbase::Scan scan{}; + auto start = i * rows; + scan.SetStartRow(PrefixZero(start)); + scan.SetStopRow(PrefixZero((i+1) * rows)); + auto scanner = table1->Scan(scan); + + auto cnt = start; + auto r = scanner->Next(); + auto row_size = families.size() * cols; + while (r != nullptr) { + if (r->Cells().size() != row_size) { + LOG(ERROR) << "#cells in row (" << std::to_string(r->Cells().size()) << ") != " + << row_size; + } + auto row = PrefixZero(cnt); + if (r->Row().compare(row) != 0) { + LOG(ERROR) << "row " << r->Row() << " is not the expected: " << row; + } + for (auto family : families) { + auto val_read = *r->Value(family, "1"); + if (val_read != val) { + LOG(ERROR) << "value " << val_read << " != " << val; + } + } + cnt++; + r = scanner->Next(); + } + LOG(INFO) << i << " scanned " << std::to_string(cnt-start) << " rows"; + } else { + for (int k = 0; k < rows; k++) { + std::string row = PrefixZero(i * rows + k); + hbase::Get get(row); + auto result = table1->Get(get); + // Test the values, should be same as in put executed on hbase shell + if (row.compare(result->Row()) != 0) { + LOG(ERROR) << "row is not " << row; + } + for (auto family : families) { + if (val.compare(*(result->Value(family, "1"))) != 0) { + LOG(ERROR) << "value is not " << val; + } + } + } + LOG(INFO) << "Sent " << rows << " gets"; + } + table1->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + threads[i].join(); + } + delete[] threads; + + LOG(INFO) << "Successful in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + + client->Close(); + + return 0; +}