diff --git hbase-native-client/core/async-rpc-retrying-test.cc hbase-native-client/core/async-rpc-retrying-test.cc index d0c7921..5086286 100644 --- hbase-native-client/core/async-rpc-retrying-test.cc +++ hbase-native-client/core/async-rpc-retrying-test.cc @@ -137,15 +137,15 @@ class MockRawAsyncTableImpl { /* in real RawAsyncTableImpl, this should be private. */ template - folly::Future Call( - std::shared_ptr rpc_client, std::shared_ptr controller, - std::shared_ptr loc, const REQ& req, - const ReqConverter, REQ, std::string>& req_converter, - const hbase::RpcCall& rpc_call, - const RespConverter& resp_converter) { + folly::Future Call(std::shared_ptr rpc_client, + std::shared_ptr controller, + std::shared_ptr loc, const REQ& req, + ReqConverter, REQ, std::string> req_converter, + const hbase::RpcCall& rpc_call, + RespConverter resp_converter) { rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) - .then([&, this](std::unique_ptr presp) { - std::shared_ptr result = hbase::ResponseConverter::FromGetResponse(*presp); + .then([&, this, resp_converter](std::unique_ptr presp) { + RESP result = resp_converter(*presp); promise_->setValue(result); }) .onError([this](const std::exception& e) { promise_->setException(e); }); @@ -210,31 +210,27 @@ TEST(AsyncRpcRetryTest, TestGetBasic) { auto builder = conn->caller_factory()->Single>(); /* call with retry to get result */ - try { - auto async_caller = - builder->table(std::make_shared(tn)) - ->row(row) - ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) - ->operation_timeout(conn->connection_conf()->operation_timeout()) - ->action([=, &get](std::shared_ptr controller, - std::shared_ptr loc, - std::shared_ptr rpc_client) - -> folly::Future> { - return tableImpl->GetCall(rpc_client, controller, loc, get); - }) - ->Build(); - - auto result = async_caller->Call().get(); - - // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; - EXPECT_EQ("test2", result->Row()); - EXPECT_EQ("value2", *(result->Value("d", "2"))); - EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - throw e; - } + + auto async_caller = + builder->table(std::make_shared(tn)) + ->row(row) + ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) + ->operation_timeout(conn->connection_conf()->operation_timeout()) + ->action([=, &get](std::shared_ptr controller, + std::shared_ptr loc, + std::shared_ptr rpc_client) + -> folly::Future> { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) + ->Build(); + + auto result = async_caller->Call().get(); + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); table->Close(); client.Close(); diff --git hbase-native-client/core/raw-async-table.cc hbase-native-client/core/raw-async-table.cc index 641f3c8..88a3382 100644 --- hbase-native-client/core/raw-async-table.cc +++ hbase-native-client/core/raw-async-table.cc @@ -16,6 +16,7 @@ * limitations under the License. * */ +#include #include "core/raw-async-table.h" #include "core/request-converter.h" @@ -41,18 +42,16 @@ template folly::Future RawAsyncTable::Call( std::shared_ptr rpc_client, std::shared_ptr controller, std::shared_ptr loc, const REQ& req, - const ReqConverter, REQ, std::string>& req_converter, - const RespConverter& resp_converter) { + const ReqConverter, REQ, std::string> req_converter, + const RespConverter resp_converter) { std::unique_ptr preq = req_converter(req, loc->region_name()); // No need to make take a callable argument, it is always the same return rpc_client ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), User::defaultUser(), "ClientService") - .then([&](const std::unique_ptr& presp) { - return ResponseConverter::FromGetResponse(*presp); - // return resp_converter(*presp); // TODO this is causing SEGFAULT, figure out why - }); + .then( + [resp_converter](const std::unique_ptr& presp) { return resp_converter(*presp); }); } Future> RawAsyncTable::Get(const hbase::Get& get) { diff --git hbase-native-client/core/raw-async-table.h hbase-native-client/core/raw-async-table.h index 527c7be..bbdc6bd 100644 --- hbase-native-client/core/raw-async-table.h +++ hbase-native-client/core/raw-async-table.h @@ -18,12 +18,12 @@ */ #pragma once +#include + #include #include #include -#include - #include "core/async-connection.h" #include "core/async-rpc-retrying-caller-factory.h" #include "core/async-rpc-retrying-caller.h" @@ -66,8 +66,8 @@ class RawAsyncTable { folly::Future Call( std::shared_ptr rpc_client, std::shared_ptr controller, std::shared_ptr loc, const REQ& req, - const ReqConverter, REQ, std::string>& req_converter, - const RespConverter& resp_converter); + const ReqConverter, REQ, std::string> req_converter, + const RespConverter resp_converter); template std::shared_ptr> CreateCallerBuilder(std::string row, diff --git hbase-native-client/core/response-converter.cc hbase-native-client/core/response-converter.cc index b11856c..b2fff34 100644 --- hbase-native-client/core/response-converter.cc +++ hbase-native-client/core/response-converter.cc @@ -36,14 +36,12 @@ ResponseConverter::~ResponseConverter() {} // impl note: we are returning shared_ptr's instead of unique_ptr's because these // go inside folly::Future's, making the move semantics extremely tricky. std::shared_ptr ResponseConverter::FromGetResponse(const Response& resp) { - LOG(INFO) << "FromGetResponse"; auto get_resp = std::static_pointer_cast(resp.resp_msg()); return ToResult(get_resp->result(), resp.cell_scanner()); } std::shared_ptr ResponseConverter::ToResult( const hbase::pb::Result& result, const std::unique_ptr& cell_scanner) { - LOG(INFO) << "ToResult"; std::vector> vcells; for (auto cell : result.cell()) { std::shared_ptr pcell = @@ -59,13 +57,11 @@ std::shared_ptr ResponseConverter::ToResult( } // TODO: check associated cell count? } - LOG(INFO) << "Returning Result"; return std::make_shared(vcells, result.exists(), result.stale(), result.partial()); } std::vector> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast(resp.resp_msg()); - LOG(INFO) << "FromScanResponse:" << scan_resp->ShortDebugString(); int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();