diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb..d238073 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -33,6 +33,7 @@ cxx_library( "service.h", "rpc-client.h", "sasl-util.h", + "map-util.h", ], srcs=[ "client-dispatcher.cc", diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 857042c..eb0f788 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -19,7 +19,6 @@ #pragma once -#include #include #include @@ -29,6 +28,7 @@ #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" +#include "connection/map-util.h" namespace hbase { /** @@ -51,7 +51,7 @@ class ClientDispatcher folly::Future close() override; private: - folly::AtomicHashMap>> requests_; + MapUtil>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 775df68..91ba47c 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -40,7 +40,7 @@ ClientHandler::ClientHandler(std::string user_name, std::shared_ptr codec serde_(codec), server_(server), once_flag_(std::make_unique()), - resp_msgs_(std::make_unique>>(5000)) { + resp_msgs_(std::make_unique>>(5000)) { } void ClientHandler::read(Context *ctx, std::unique_ptr buf) { @@ -135,6 +135,9 @@ folly::Future ClientHandler::write(Context *ctx, std::unique_ptrinsert(r->call_id(), r->resp_msg()); + VLOG(3) << "INSERTED RPC Request with call_id:" + << r->call_id(); // TODO: more logging for RPC Header + // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 4c106e0..1854413 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -18,18 +18,19 @@ */ #pragma once -#include #include #include #include #include +#include #include #include #include "exceptions/exception.h" #include "serde/codec.h" #include "serde/rpc.h" +#include "connection/map-util.h" // Forward decs. namespace hbase { @@ -81,7 +82,7 @@ class ClientHandler std::string server_; // for logging // in flight requests - std::unique_ptr>> + std::unique_ptr>> resp_msgs_; }; } // namespace hbase diff --git a/hbase-native-client/connection/map-util.h b/hbase-native-client/connection/map-util.h new file mode 100644 index 0000000..125a47f --- /dev/null +++ b/hbase-native-client/connection/map-util.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +namespace hbase { +template +class MapUtil { + + private: + std::unique_ptr mutex_; + std::unique_ptr> map_; + + public: + MapUtil(int bucket_count) : map_(bucket_count), mutex_(std::make_unique()) {} + + void insert(const Key& id, const T& val) { + std::lock_guard lock(*mutex_); + map_->insert(std::make_pair(id, val)); + } + + void erase(const Key& id) { + std::lock_guard lock(*mutex_); + map_->erase(id); + } + + std::unordered_map::const_iterator find(const Key& id) { + std::shared_lock lock(*mutex_); + return map_->find(id); + } + + std::unordered_map::iterator end() { + return map_->end(); + } +}; +}