diff --git hbase-native-client/connection/client-dispatcher.cc hbase-native-client/connection/client-dispatcher.cc index 35a1f7d..4b42159 100644 --- hbase-native-client/connection/client-dispatcher.cc +++ hbase-native-client/connection/client-dispatcher.cc @@ -25,15 +25,13 @@ using std::unique_ptr; namespace hbase { -ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} +ClientDispatcher::ClientDispatcher() : current_call_id_(9), requests_(5000) {} void ClientDispatcher::read(Context *ctx, unique_ptr in) { auto call_id = in->call_id(); - auto search = requests_.find(call_id); CHECK(search != requests_.end()); auto p = std::move(search->second); - requests_.erase(call_id); if (in->exception()) { @@ -46,8 +44,10 @@ void ClientDispatcher::read(Context *ctx, unique_ptr in) { folly::Future> ClientDispatcher::operator()(unique_ptr arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); - requests_.insert(call_id, folly::Promise>{}); + + requests_[call_id] = folly::Promise>{}; auto &p = requests_.find(call_id)->second; + auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { LOG(ERROR) << "e = " << call_id; diff --git hbase-native-client/connection/client-dispatcher.h hbase-native-client/connection/client-dispatcher.h index 857042c..1f8e6b3 100644 --- hbase-native-client/connection/client-dispatcher.h +++ hbase-native-client/connection/client-dispatcher.h @@ -19,16 +19,18 @@ #pragma once -#include #include #include #include +#include #include +#include #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" +#include "utils/concurrent-map.h" namespace hbase { /** @@ -51,7 +53,7 @@ class ClientDispatcher folly::Future close() override; private: - folly::AtomicHashMap>> requests_; + concurrent_map>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // diff --git hbase-native-client/connection/client-handler.cc hbase-native-client/connection/client-handler.cc index 775df68..c7005ad 100644 --- hbase-native-client/connection/client-handler.cc +++ hbase-native-client/connection/client-handler.cc @@ -40,7 +40,7 @@ ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec serde_(codec), server_(server), once_flag_(std::make_unique()), - resp_msgs_(std::make_unique>>(5000)) { + resp_msgs_(std::make_unique>>(5000)) { } void ClientHandler::read(Context *ctx, std::unique_ptr buf) { @@ -133,7 +133,7 @@ folly::Future ClientHandler::write(Context *ctx, std::unique_ptrDebugString() << ", server: " << server_; // Now store the call id to response. - resp_msgs_->insert(r->call_id(), r->resp_msg()); + (*resp_msgs_)[r->call_id()] = r->resp_msg(); // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); diff --git hbase-native-client/connection/client-handler.h hbase-native-client/connection/client-handler.h index 4c106e0..8de3a8b 100644 --- hbase-native-client/connection/client-handler.h +++ hbase-native-client/connection/client-handler.h @@ -18,7 +18,6 @@ */ #pragma once -#include #include #include @@ -30,6 +29,7 @@ #include "exceptions/exception.h" #include "serde/codec.h" #include "serde/rpc.h" +#include "utils/concurrent-map.h" // Forward decs. namespace hbase { @@ -81,7 +81,6 @@ class ClientHandler std::string server_; // for logging // in flight requests - std::unique_ptr>> - resp_msgs_; + std::unique_ptr>> resp_msgs_; }; } // namespace hbase diff --git hbase-native-client/core/simple-client.cc hbase-native-client/core/simple-client.cc index 2fd7108..5d31af4 100644 --- hbase-native-client/core/simple-client.cc +++ hbase-native-client/core/simple-client.cc @@ -53,6 +53,8 @@ DEFINE_string(row, "row_", "row prefix"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); DEFINE_string(conf, "", "Conf directory to read the config from (optional)"); DEFINE_uint64(num_rows, 10000, "How many rows to write and read"); +DEFINE_uint64(batch_num_rows, 10000, "How many rows batch for multi-gets and multi-puts"); +DEFINE_uint64(report_num_rows, 10000, "How frequent we should report the progress"); DEFINE_bool(puts, true, "Whether to perform puts"); DEFINE_bool(gets, true, "Whether to perform gets"); DEFINE_bool(multigets, true, "Whether to perform multi-gets"); @@ -106,6 +108,10 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Sending put requests"; for (uint64_t i = 0; i < num_puts; i++) { table->Put(*MakePut(Row(FLAGS_row, i))); + if (i != 0 && i % FLAGS_report_num_rows == 0) { + LOG(INFO) << "Sent " << i << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } } LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " @@ -120,6 +126,9 @@ int main(int argc, char *argv[]) { auto result = table->Get(Get{Row(FLAGS_row, i)}); if (FLAGS_display_results) { LOG(INFO) << result->DebugString(); + } else if (i != 0 && i % FLAGS_report_num_rows == 0) { + LOG(INFO) << "Sent " << i << " Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; } } @@ -129,21 +138,29 @@ int main(int argc, char *argv[]) { // Do the Multi-Gets if (FLAGS_multigets) { + LOG(INFO) << "Sending multi-get requests"; + start_ns = TimeUtil::GetNowNanos(); std::vector gets; - for (uint64_t i = 0; i < num_puts; ++i) { - hbase::Get get(Row(FLAGS_row, i)); - gets.push_back(get); - } - LOG(INFO) << "Sending multi-get requests"; - start_ns = TimeUtil::GetNowNanos(); - auto results = table->Get(gets); + for (uint64_t i = 0; i < num_puts;) { + gets.clear(); + // accumulate batch_num_rows at a time + for (uint64_t j = 0; j < FLAGS_batch_num_rows && i < num_puts; ++j) { + hbase::Get get(Row(FLAGS_row, i)); + gets.push_back(get); + i++; + } + auto results = table->Get(gets); - if (FLAGS_display_results) { - for (const auto &result : results) LOG(INFO) << result->DebugString(); + if (FLAGS_display_results) { + for (const auto &result : results) LOG(INFO) << result->DebugString(); + } else if (i != 0 && i % FLAGS_report_num_rows == 0) { + LOG(INFO) << "Sent " << i << " Multi-Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } } - LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in " + LOG(INFO) << "Successfully sent " << num_puts << " Multi-Get requests in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } @@ -162,6 +179,10 @@ int main(int argc, char *argv[]) { } r = scanner->Next(); i++; + if (!FLAGS_display_results && i != 0 && i % FLAGS_report_num_rows == 0) { + LOG(INFO) << "Scan iterated over " << i << " results " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } } LOG(INFO) << "Successfully iterated over " << i << " Scan results in " diff --git hbase-native-client/utils/BUCK hbase-native-client/utils/BUCK index 788056b..482b8c5 100644 --- hbase-native-client/utils/BUCK +++ hbase-native-client/utils/BUCK @@ -20,6 +20,7 @@ cxx_library( exported_headers=[ "bytes-util.h", "connection-util.h", + "concurrent-map.h", "optional.h", "sys-util.h", "time-util.h", diff --git hbase-native-client/utils/concurrent-map.h hbase-native-client/utils/concurrent-map.h new file mode 100644 index 0000000..a42bed6 --- /dev/null +++ hbase-native-client/utils/concurrent-map.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +namespace hbase { + +/** + * A concurrent version of std::unordered_map where we acquire a shared or exclusive + * lock for operations. This is NOT a highly-concurrent and scalable implementation + * since there is only one lock object. + */ +template +class concurrent_map { + public: + typedef typename std::unordered_map::iterator iterator; + + + explicit concurrent_map(int32_t n): map_(n), mutex_() {} + + V& operator[](const K& key) { + return map_[key]; + } + + // TODO +// void insert(const std::pair& entry) { +// std::unique_lock lock(mutex_); +// map_.insert(entry); +// } + + void erase(const K& key) { + std::unique_lock lock(mutex_); + map_.erase(key); + } + + iterator begin() { + return map_.begin(); + } + + iterator end() { + return map_.end(); + } + + iterator find(const K &key) { + std::shared_lock lock(mutex_); + return map_.find(key); + } + + private: + std::shared_timed_mutex mutex_; + std::unordered_map map_; +}; +} /* namespace hbase */