diff --git hbase-native-client/connection/client-handler.cc hbase-native-client/connection/client-handler.cc index 5a6dce2..6778ff5 100644 --- hbase-native-client/connection/client-handler.cc +++ hbase-native-client/connection/client-handler.cc @@ -51,7 +51,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { ResponseHeader header; int used_bytes = serde_.ParseDelimited(buf.get(), &header); - LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() + DLOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() << " has_exception=" << header.has_exception(); // Get the response protobuf from the map @@ -73,10 +73,21 @@ void ClientHandler::read(Context *ctx, std::unique_ptr buf) { // data left on the wire. if (header.has_exception() == false) { buf->trimStart(used_bytes); + VLOG(INFO) << "used bytes:-" << used_bytes; + VLOG(INFO) << "buf->length():-" << buf->length(); + used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get()); + int cell_block_meta_length = header.cell_block_meta().length(); // Make sure that bytes were parsed. - CHECK(used_bytes == buf->length()); - received.set_resp_msg(resp_msg); + LOG(INFO) << "used_bytes:-" << used_bytes; + LOG(INFO) << "cell_block_meta_length:-" << cell_block_meta_length; + LOG(INFO) << "used_bytes + cell_block_meta_length:-" << used_bytes + cell_block_meta_length; + CHECK((used_bytes + cell_block_meta_length) == buf->length()); + if (cell_block_meta_length > 0) { + received.set_cell_block_meta(std::move(buf), used_bytes, cell_block_meta_length); + } else { + received.set_resp_msg(resp_msg); + } } ctx->fireRead(std::move(received)); } diff --git hbase-native-client/connection/response.h hbase-native-client/connection/response.h index 560387c..c6c479b 100644 --- hbase-native-client/connection/response.h +++ hbase-native-client/connection/response.h @@ -62,8 +62,26 @@ class Response { resp_msg_ = std::move(response); } + uint32_t cell_block_start_offset() const { return cell_block_start_offset_; } + + uint32_t cell_block_length() const { return cell_block_length_; } + + std::shared_ptr cell_block() const { + return cell_block_; + } + + void set_cell_block_meta(std::shared_ptr cell_block, + uint32_t cell_block_start_offset, uint32_t cell_block_length) { + cell_block_start_offset_ = cell_block_start_offset; + cell_block_length_ = cell_block_length; + cell_block_ = std::move(cell_block); + } + private: uint32_t call_id_; std::shared_ptr resp_msg_; + std::shared_ptr cell_block_ = nullptr; + uint32_t cell_block_start_offset_ = 0; + uint32_t cell_block_length_ = 0; }; } // namespace hbase diff --git hbase-native-client/core/location-cache.cc hbase-native-client/core/location-cache.cc index dab5deb..05046e6 100644 --- hbase-native-client/core/location-cache.cc +++ hbase-native-client/core/location-cache.cc @@ -167,19 +167,79 @@ Future> LocationCache::LocateRegion(const hbase::pb:: } std::shared_ptr LocationCache::CreateLocation(const Response &resp) { - auto resp_msg = static_pointer_cast(resp.resp_msg()); - auto &results = resp_msg->results().Get(0); - auto &cells = results.cell(); - - // TODO(eclark): There should probably be some better error - // handling around this. - auto cell_zero = cells.Get(0).value(); - auto cell_one = cells.Get(1).value(); - auto row = cells.Get(0).row(); - - auto region_info = folly::to(cell_zero); - auto server_name = folly::to(cell_one); - return std::make_shared(row, std::move(region_info), server_name, nullptr); + std::string row(""); + std::shared_ptr region_info; + std::shared_ptr server_name; + + auto cell_block_meta = resp.cell_block_meta(); + if (cell_block_meta) { + std::unique_ptr cell_scanner = std::unique_ptr( + KeyValueCodec::Decoder(resp.cell_block_meta(), resp.cell_block_meta_start_offset(), + resp.cell_block_meta_length())); + + while (cell_scanner->Advance()) { + auto cell = cell_scanner->Current(); + row = cell->Row(); + auto qualifier = cell->Qualifier(); + if (qualifier == "regioninfo") { + auto value = cell->Value(); + char *pValue = const_cast(value.c_str()); + char PB_MAGIC[4] = {'P', 'B', 'U', 'F'}; + for (int i = 0; i < 4; i++) { + if ((*pValue++) == PB_MAGIC[i]) { + continue; + } else { + throw std::runtime_error("Expecting a protobuf Region here"); + } + } + + google::protobuf::io::ArrayInputStream arr(pValue, value.size() - 4); + google::protobuf::io::CodedInputStream input(&arr); + + RegionInfo pb_region_info; + bool success = pb_region_info.ParseFromCodedStream(&input); + region_info = std::make_shared(pb_region_info); + } + + if (qualifier == "server") { + auto value = cell->Value(); + ServerName pb_servername; + size_t pos = value.find(":"); + if (std::string::npos != pos) { + auto server = value.substr(0, pos); + auto port = atol((value.substr(++pos)).c_str()); + pb_servername.set_host_name(server); + pb_servername.set_port(port); + } + server_name = std::make_shared(pb_servername); + } + } + } else { + auto resp_msg = static_pointer_cast(resp.resp_msg()); + auto &results = resp_msg->results().Get(0); + auto &cells = results.cell(); + + // TODO(eclark): There should probably be some better error + // handling around this. + auto cell_zero = cells.Get(0).value(); + auto cell_one = cells.Get(1).value(); + row = cells.Get(0).row(); + region_info = std::make_shared(folly::to(cell_zero)); + server_name = std::make_shared(folly::to(cell_one)); + } + LOG(INFO) << "row=> " << row; + LOG(INFO) << "region_info->region_id()=> " << region_info->region_id(); + LOG(INFO) << "region_info->start_key()=> " << region_info->start_key(); + LOG(INFO) << "region_info->end_key()=> " << region_info->end_key(); + LOG(INFO) << "region_info->offline()=> " << region_info->offline(); + LOG(INFO) << "region_info->split()=> " << region_info->split(); + LOG(INFO) << "region_info->replica_id()=> " << region_info->replica_id(); + LOG(INFO) << "table_name=> " << region_info->table_name().namespace_() << ":" + << region_info->table_name().qualifier(); + LOG(INFO) << "server=> " << server_name->host_name() << ":" << server_name->port(); + + return std::make_shared(row, std::move(*region_info.get()), *server_name.get(), + nullptr); } // must hold shared lock on locations_lock_ diff --git hbase-native-client/serde/rpc.cc hbase-native-client/serde/rpc.cc index 3bdb489..4ebb2d9 100644 --- hbase-native-client/serde/rpc.cc +++ hbase-native-client/serde/rpc.cc @@ -48,6 +48,7 @@ static const std::string PREAMBLE = "HBas"; static const std::string INTERFACE = "ClientService"; static const uint8_t RPC_VERSION = 0; static const uint8_t DEFAULT_AUTH_TYPE = 80; +static const std::string KEYVALUE_CODEC_JAVA_CLASS = "org.apache.hadoop.hbase.codec.KeyValueCodec"; int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { if (buf == nullptr || msg == nullptr) { @@ -110,6 +111,7 @@ unique_ptr RpcSerde::Header(const string &user) { // didn't. // TODO: send the service name and user from the RpcClient h.set_service_name(INTERFACE); + h.set_cell_block_codec_class(KEYVALUE_CODEC_JAVA_CLASS); return PrependLength(SerializeMessage(h)); }