diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 464c010..a24b513 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -329,3 +329,9 @@ cxx_binary( "simple-client.cc", ], deps=[":core", "//connection:connection"],) +cxx_binary( + name="load-client", + srcs=[ + "load-client.cc", + ], + deps=[":core", "//connection:connection"],) diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc new file mode 100644 index 0000000..69619de --- /dev/null +++ b/hbase-native-client/core/load-client.cc @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include "core/client.h" +#include "core/get.h" +#include "core/put.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "utils/time-util.h" + +using hbase::Client; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Table; +using hbase::pb::TableName; +using hbase::TimeUtil; +using folly::Random; + +DEFINE_string(table, "t", "What table to do the reads and writes with"); +DEFINE_string(families, "d", "comma separated list of column family names"); +DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_uint64(num_rows, 1'000'000, "How many rows to write and read"); +DEFINE_uint64(num_cols, 1000, "How many columns there are in a row"); +DEFINE_int32(threads, 10, "How many client threads"); +DEFINE_int32(multi_get_size, 1, "number of gets in one multi-get"); +DEFINE_bool(skip_get, false, "skip get / scan"); +DEFINE_bool(skip_put, false, "skip put's"); + +std::string Row(const std::string &prefix, uint64_t i) { + auto suf = folly::to(i); + return prefix + suf; +} + +// total width of row key +static int total_width = 8; +// increments would be exercised 1 out of incr_mod rows +static int incr_mod = 10; +static constexpr const char *kNumColumn = "num"; + +std::string PrefixZero(int num) { + std::string str = std::to_string(num); + return std::string(total_width - str.length(), '0') + str; +} + +void DoScan(int iteration, uint64_t rows, std::unique_ptr table, + const std::vector &families, bool *succeeded) { + hbase::Scan scan{}; + auto start = iteration * rows; + scan.SetStartRow(PrefixZero(start)); + scan.SetStopRow(PrefixZero((iteration + 1) * rows)); + auto scanner = table->Scan(scan); + + auto cnt = start; + auto r = scanner->Next(); + while (r != nullptr) { + auto row = PrefixZero(cnt); + if (r->Row().compare(row) != 0) { + LOG(ERROR) << "row " << r->Row() << " is not the expected: " << row; + *succeeded = false; + } + for (auto family : families) { + auto cols = std::stoi(*(r->Value(family, kNumColumn))); + VLOG(3) << "scan gets " << std::to_string(cols) << " columns"; + for (int m = 1; m <= cols; m++) { + auto col = std::to_string(m); + auto val_read = *r->Value(family, col); + if (val_read != row) { + LOG(ERROR) << "value " << val_read << " != " << row; + *succeeded = false; + } + } + } + cnt++; + r = scanner->Next(); + } + LOG(INFO) << "Iteration " << iteration << " scanned " << std::to_string(cnt - start) << " rows"; +} + +void DoGet(int iteration, uint64_t rows, std::unique_ptr
table, + const std::vector &families, int multi_get_size, bool *succeeded) { + for (uint64_t k = 0; k < rows;) { + std::vector gets; + for (uint64_t i = k; i < k + multi_get_size && i < rows; ++i) { + std::string row = PrefixZero(iteration * rows + i); + hbase::Get get(row); + gets.push_back(get); + } + VLOG(3) << "getting for " << multi_get_size << " rows"; + auto results = table->Get(gets); + for (uint64_t i = 0; i < multi_get_size && i < rows; ++i) { + auto result = results[i]; + //LOG(INFO) << "got results for row " << result->Row(); + if (result == nullptr) { + LOG(ERROR) << "didn't get result"; + *succeeded = false; + continue; + } + // Test the values + std::string rev(result->Row()); + std::reverse(rev.begin(), rev.end()); + for (auto family : families) { + auto cols = std::stoi(*(result->Value(family, kNumColumn))); + VLOG(3) << "gets " << std::to_string(cols) << " columns"; + for (int m = 1; m <= cols; m++) { + auto col = std::to_string(m); + if (!result->Value(family, col)) { + LOG(ERROR) << "No value for " << result->Row() << " at " << family << "@" << col; + *succeeded = false; + continue; + } + if (iteration % incr_mod == 1) { + auto l = hbase::BytesUtil::ToInt64(*(result->Value(family, col))); + if (l != m) { + LOG(ERROR) << "value is not " << col << " for " << result->Row(); + *succeeded = false; + } + } else if (rev.compare(*(result->Value(family, col))) != 0) { + LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << rev; + *succeeded = false; + } + } + } + } + k += multi_get_size; + } + LOG(INFO) << "Sent " << rows << " gets"; +} + +void DoPut(int iteration, uint64_t rows, int cols, std::unique_ptr
table, + const std::vector &families) { + auto n_cols = Random::rand32(1, cols); + for (uint64_t j = 0; j < rows; j++) { + std::string row = PrefixZero(iteration * rows + j); + for (auto family : families) { + table->Put(Put{row}.AddColumn(family, kNumColumn, std::to_string(n_cols))); + for (unsigned int k = 1; k <= n_cols; k++) { + table->Put(Put{row}.AddColumn(family, std::to_string(k), row)); + } + } + if ((j+1) % 5000 == 0) LOG(INFO) << "Written " << std::to_string(j+1) << " rows"; + } +} + +void DoAppendIncrement(int iteration, uint64_t rows, int cols, std::unique_ptr
table, + const std::vector &families, bool *succeeded) { + for (uint64_t j = 0; j < rows; j++) { + std::string row = PrefixZero(iteration * rows + j); + table->Delete(hbase::Delete{row}); + } + auto n_cols = Random::rand32(1, cols); + for (uint64_t j = 0; j < rows; j++) { + std::string row = PrefixZero(iteration * rows + j); + std::string rev(row); + std::reverse(rev.begin(), rev.end()); + for (auto family : families) { + table->Put(Put{row}.AddColumn(family, kNumColumn, std::to_string(n_cols))); + for (unsigned int k = 1; k <= n_cols; k++) { + if (iteration % incr_mod == 1) { + table->Increment(hbase::Increment{row}.AddColumn(family, std::to_string(k), k)); + } else { + if (!table->Append(hbase::Append{row}.Add(family, std::to_string(k), rev))) { + LOG(ERROR) << "append for " << row << " family: " << family << " failed"; + *succeeded = false; + } + } + } + } + if ((j+1) % 5000 == 0) LOG(INFO) << "Written " << std::to_string(j+1) << " increments"; + } +} + +int main(int argc, char *argv[]) { + google::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line"); + google::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + google::InstallFailureSignalHandler(); + FLAGS_logtostderr = 1; + FLAGS_stderrthreshold = 1; + + if (FLAGS_multi_get_size < 1) { + LOG(ERROR) << "size of multi get should be positive"; + return -1; + } + if (FLAGS_skip_get && FLAGS_skip_put) { + LOG(ERROR) << "Must perform at least Get or Put operations"; + return -1; + } + // Configuration + auto conf = std::make_shared(); + conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); + + auto tn = std::make_shared(folly::to(FLAGS_table)); + auto num_puts = FLAGS_num_rows; + + auto client = std::make_unique(*conf); + + // Do the Put requests + + std::vector families; + std::size_t pos = 0, found; + while ((found = FLAGS_families.find_first_of(',', pos)) != std::string::npos) { + families.push_back(FLAGS_families.substr(pos, found - pos)); + pos = found + 1; + } + families.push_back(FLAGS_families.substr(pos)); + + int rows = FLAGS_num_rows / FLAGS_threads; + if (FLAGS_num_rows % FLAGS_threads != 0) rows++; + int cols = FLAGS_num_cols; + bool succeeded = true; + if (!FLAGS_skip_put) { + LOG(INFO) << "starting to write"; + auto start_ns = TimeUtil::GetNowNanos(); + std::thread *writer_threads = new std::thread[FLAGS_threads]; + for (int i = 0; i < FLAGS_threads; i++) { + writer_threads[i] = std::thread([=, &client, &tn, &families, &succeeded] { + // Get connection to HBase Table + auto table = client->Table(*tn); + + if ((i % 2) == 0) { + DoPut(i, rows, cols, std::move(table), families); + } else { + DoAppendIncrement(i, rows, cols, std::move(table), families, &succeeded); + } + table->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + writer_threads[i].join(); + } + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + delete[] writer_threads; + } + + if (!FLAGS_skip_get) { + auto start_ns = TimeUtil::GetNowNanos(); + std::thread *reader_threads = new std::thread[FLAGS_threads]; + for (int i = 0; i < FLAGS_threads; i++) { + reader_threads[i] = std::thread([=, &client, &tn, &families, &succeeded] { + // Get connection to HBase Table + auto table1 = client->Table(*tn); + + if ((i % 2) == 0) { + DoScan(i, rows, std::move(table1), families, &succeeded); + } else { + DoGet(i, rows, std::move(table1), families, FLAGS_multi_get_size, &succeeded); + } + table1->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + reader_threads[i].join(); + } + delete[] reader_threads; + + LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + + client->Close(); + + return succeeded ? 0 : -1; +}