diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 464c010..a24b513 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -329,3 +329,9 @@ cxx_binary( "simple-client.cc", ], deps=[":core", "//connection:connection"],) +cxx_binary( + name="load-client", + srcs=[ + "load-client.cc", + ], + deps=[":core", "//connection:connection"],) diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc new file mode 100644 index 0000000..b538444 --- /dev/null +++ b/hbase-native-client/core/load-client.cc @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include "core/client.h" +#include "core/get.h" +#include "core/put.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "utils/time-util.h" + +using hbase::Client; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Table; +using hbase::pb::TableName; +using hbase::TimeUtil; + +DEFINE_string(table, "t", "What table to do the reads and writes"); +DEFINE_string(families, "d", "comma separated list of column family names"); +DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_uint64(num_rows, 1'000'000, "How many rows to write and read"); +DEFINE_uint64(num_cols, 10000, "How many columns there are in a row"); +DEFINE_int32(threads, 10, "How many client threads"); +DEFINE_int32(multi_get_size, 5, "number of gets in multi-get"); +DEFINE_bool(skip_get, false, "skip get / scan"); + +std::string Row(const std::string &prefix, uint64_t i) { + auto suf = folly::to(i); + return prefix + suf; +} + +// total width of row key +static int total_width = 8; +// increments would be exercised 1 out of incr_mod rows +static int incr_mod = 10; + +std::string PrefixZero(int num) { + std::string str = std::to_string(num); + return std::string(total_width - str.length(), '0') + str; +} + +void DoScan(int iteration, uint64_t rows, int cols, std::unique_ptr table, + const std::vector &families, bool* succeeded) { + hbase::Scan scan{}; + auto start = iteration * rows; + scan.SetStartRow(PrefixZero(start)); + scan.SetStopRow(PrefixZero((iteration + 1) * rows)); + auto scanner = table->Scan(scan); + + auto cnt = start; + auto r = scanner->Next(); + auto row_size = families.size() * cols; + while (r != nullptr) { + if (r->Cells().size() != row_size) { + LOG(ERROR) << "#cells in row (" << std::to_string(r->Cells().size()) << ") != " << row_size; + *succeeded = false; + } + auto row = PrefixZero(cnt); + if (r->Row().compare(row) != 0) { + LOG(ERROR) << "row " << r->Row() << " is not the expected: " << row; + *succeeded = false; + } + for (auto family : families) { + for (int m = 1; m <= cols; m++) { + auto col = std::to_string(m); + auto val_read = *r->Value(family, col); + if (val_read != row) { + LOG(ERROR) << "value " << val_read << " != " << row; + *succeeded = false; + } + } + } + cnt++; + r = scanner->Next(); + } + LOG(INFO) << "Iteration " << iteration << " scanned " << std::to_string(cnt - start) << " rows"; +} + +void DoGet(int iteration, uint64_t rows, int cols, std::unique_ptr
table, + const std::vector &families, int multi_get_size, bool* succeeded) { + for (int k = 0; k < rows;) { + std::vector gets; + for (uint64_t i = k; i < k + multi_get_size && i < rows; ++i) { + std::string row = PrefixZero(iteration * rows + i); + hbase::Get get(row); + gets.push_back(get); + } + // LOG(INFO) << "getting for " << multi_get_size << " rows"; + auto results = table->Get(gets); + for (const auto &result : results) { + if (result == nullptr) { + LOG(ERROR) << "didn't get result"; + *succeeded = false; + continue; + } + // Test the values + std::string rev(result->Row()); + std::reverse(rev.begin(), rev.end()); + for (auto family : families) { + for (int m = 1; m <= cols; m++) { + auto col = std::to_string(m); + if (!result->Value(family, col)) { + LOG(ERROR) << "No value for " << result->Row() << " at " << family << "@" << col; + *succeeded = false; + continue; + } + if (iteration % incr_mod == 1) { + auto l = hbase::BytesUtil::ToInt64(*(result->Value(family, col))); + if (l != m) { + LOG(ERROR) << "value is not " << col << " for " << result->Row(); + *succeeded = false; + } + } else if (rev.compare(*(result->Value(family, col))) != 0) { + LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << rev; + *succeeded = false; + } + } + } + } + k += multi_get_size; + } + LOG(INFO) << "Sent " << rows << " gets"; +} + +void DoPut(int iteration, uint64_t rows, int cols, std::unique_ptr
table, + const std::vector &families, bool* succeeded) { + for (auto j = 0; j < rows; j++) { + std::string row = PrefixZero(iteration * rows + j); + for (auto family : families) { + for (int k = 1; k <= cols; k++) { + table->Put(Put{row}.AddColumn(family, std::to_string(k), row)); + } + } + } +} + +void DoAppendIncrement(int iteration, uint64_t rows, int cols, std::unique_ptr
table, + const std::vector &families, bool *succeeded) { + for (auto j = 0; j < rows; j++) { + std::string row = PrefixZero(iteration * rows + j); + table->Delete(hbase::Delete{row}); + } + for (auto j = 0; j < rows; j++) { + std::string row = PrefixZero(iteration * rows + j); + std::string rev(row); + std::reverse(rev.begin(), rev.end()); + for (auto family : families) { + for (int k = 1; k <= cols; k++) { + if (iteration % incr_mod == 1) { + table->Increment(hbase::Increment{row}.AddColumn(family, std::to_string(k), k)); + } else { + if (!table->Append(hbase::Append{row}.Add(family, std::to_string(k), rev))) { + LOG(ERROR) << "append for " << row << " family: " << family << " failed"; + *succeeded = false; + } + } + } + } + } +} + +int main(int argc, char *argv[]) { + google::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line"); + google::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + google::InstallFailureSignalHandler(); + FLAGS_logtostderr = 1; + FLAGS_stderrthreshold = 1; + + if (FLAGS_multi_get_size < 1) { + LOG(ERROR) << "size of multi get should be positive"; + return -1; + } + // Configuration + auto conf = std::make_shared(); + conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); + + auto tn = std::make_shared(folly::to(FLAGS_table)); + auto num_puts = FLAGS_num_rows; + + auto client = std::make_unique(*conf); + + // Do the Put requests + auto start_ns = TimeUtil::GetNowNanos(); + + std::vector families; + std::size_t pos = 0, found; + while ((found = FLAGS_families.find_first_of(',', pos)) != std::string::npos) { + families.push_back(FLAGS_families.substr(pos, found - pos)); + pos = found + 1; + } + families.push_back(FLAGS_families.substr(pos)); + + int rows = FLAGS_num_rows / FLAGS_threads; + int cols = FLAGS_num_cols; + std::thread *threads = new std::thread[FLAGS_threads]; + bool succeeded = true; + for (int i = 0; i < FLAGS_threads; i++) { + threads[i] = std::thread([=, &client, &tn, &families, &succeeded] { + // Get connection to HBase Table + auto table = client->Table(*tn); + + if ((i % 2) == 0) { + DoPut(i, rows, cols, std::move(table), families, &succeeded); + } else { + DoAppendIncrement(i, rows, cols, std::move(table), families, &succeeded); + } + table->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + threads[i].join(); + } + + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + + start_ns = TimeUtil::GetNowNanos(); + if (!FLAGS_skip_get) { + for (int i = 0; i < FLAGS_threads; i++) { + threads[i] = std::thread([=, &client, &tn, &families, &succeeded] { + // Get connection to HBase Table + auto table1 = client->Table(*tn); + + if ((i % 2) == 0) { + DoScan(i, rows, cols, std::move(table1), families, &succeeded); + } else { + DoGet(i, rows, cols, std::move(table1), families, FLAGS_multi_get_size, &succeeded); + } + table1->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + threads[i].join(); + } + } + delete[] threads; + + LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + + client->Close(); + + return succeeded ? 0 : -1; +}