diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index e9fc716..51b28d0 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -201,3 +201,7 @@ cxx_binary( name="simple-client", srcs=["simple-client.cc",], deps=[":core", "//connection:connection"],) +cxx_binary( + name="load-client", + srcs=["load-client.cc",], + deps=[":core", "//connection:connection"],) diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc new file mode 100644 index 0000000..bb25b32 --- /dev/null +++ b/hbase-native-client/core/load-client.cc @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include "core/client.h" +#include "core/get.h" +#include "core/put.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "utils/time-util.h" + +using hbase::Client; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Table; +using hbase::pb::TableName; +using hbase::TimeUtil; + +DEFINE_string(table, "t", "What table to do the reads and writes"); +DEFINE_string(row_prefix, "", "row prefix"); +DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_uint64(num_rows, 1'000'000, "How many rows to write and read"); +DEFINE_uint64(num_cols, 10000, "How many columns there are in a row"); +DEFINE_int32(sleep_duration, 20, "Duration of sleep in case RetriesExhaustedException is thrown"); +DEFINE_int32(threads, 10, "How many client threads"); + +std::unique_ptr MakePut(const std::string &row) { + auto put = std::make_unique(row); + put->AddColumn("d", "q", row); + return std::move(put); +} + +std::string Row(const std::string &prefix, uint64_t i) { + auto suf = folly::to(i); + return prefix + suf; +} + +int main(int argc, char *argv[]) { + google::SetUsageMessage("Client to get a single row from HBase on the comamnd line"); + google::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + google::InstallFailureSignalHandler(); + FLAGS_logtostderr = 1; + FLAGS_stderrthreshold = 1; + + // Configuration + auto conf = std::make_shared(); + conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); + + auto tn = std::make_shared(folly::to(FLAGS_table)); + auto num_puts = FLAGS_num_rows; + + auto client = std::make_unique(*conf); + + // Do the Put requests + auto start_ns = TimeUtil::GetNowNanos(); + int rows = FLAGS_num_rows / FLAGS_threads; + int cols = FLAGS_num_cols; + int duration = FLAGS_sleep_duration; + std::thread *threads = new std::thread[FLAGS_threads]; + std::string val = "value1"; + for (int i = 0; i < FLAGS_threads; i++) { + threads[i] = std::thread([&client, &tn, i, rows, val, duration, cols] { + int numRetriesExhausted = 0; + // Get connection to HBase Table + auto table = client->Table(*tn); + + for (int j = 0; j < rows; j++) { + std::string row = std::to_string(i * rows + j); + for (int k = 1; k <= cols; k++) { + while (true) { + try { + table->Put(Put{row}.AddColumn("d", std::to_string(k), val)); + break; + } catch (hbase::RetriesExhaustedException ree) { + numRetriesExhausted++; + sleep(duration); + } + } + } + } + table->Close(); + if (numRetriesExhausted > 0) { + LOG(ERROR) << "There were " << numRetriesExhausted << " RetriesExhaustedExceptions"; + } + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + threads[i].join(); + } + + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + + // Do the Get requests + start_ns = TimeUtil::GetNowNanos(); + for (int i = 0; i < FLAGS_threads; i++) { + threads[i] = std::thread([&client, &tn, i, rows, val] { + // Get connection to HBase Table + auto table1 = client->Table(*tn); + + for (int k = 0; k < rows; k++) { + std::string row = std::to_string(i * rows + k); + hbase::Get get(row); + auto result = table1->Get(get); + // Test the values, should be same as in put executed on hbase shell + if (row.compare(result->Row()) != 0) { + LOG(ERROR) << "row is not " << row; + } + if (val.compare(*(result->Value("d", "1"))) != 0) { + LOG(ERROR) << "value is not " << val; + } + } + table1->Close(); + }); + } + for (int i = 0; i < FLAGS_threads; i++) { + threads[i].join(); + } + delete[] threads; + + LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + + client->Close(); + + return 0; +}