From dcc716d21aaceb8eaaf210a18373477b4d94478e Mon Sep 17 00:00:00 2001 From: Sudeep Sunthankar Date: Thu, 24 Nov 2016 03:57:26 +1100 Subject: [PATCH 2/2] Adding sources for Scan object diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc new file mode 100644 index 0000000..510bc6d --- /dev/null +++ b/hbase-native-client/core/scan-test.cc @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/scan.h" + +#include + +#include + +using namespace hbase; + +void ScanMethods(Scan &scan) { + scan.SetReversed(true); + EXPECT_EQ(true, scan.IsReversed()); + scan.SetReversed(false); + EXPECT_EQ(false, scan.IsReversed()); + + std::string start_row("start-row"); + std::string stop_row("stop-row"); + scan.SetStartRow(start_row); + EXPECT_EQ(start_row, scan.StartRow()); + scan.SetStopRow(stop_row); + EXPECT_EQ(stop_row, scan.StopRow()); + + scan.SetSmall(true); + EXPECT_EQ(true, scan.IsSmall()); + scan.SetSmall(false); + EXPECT_EQ(false, scan.IsSmall()); + + scan.SetCaching(3); + EXPECT_EQ(3, scan.Caching()); + + scan.SetConsistency(hbase::pb::Consistency::STRONG); + EXPECT_EQ(hbase::pb::Consistency::STRONG, scan.Consistency()); + scan.SetConsistency(hbase::pb::Consistency::TIMELINE); + EXPECT_EQ(hbase::pb::Consistency::TIMELINE, scan.Consistency()); + + scan.SetCacheBlocks(true); + EXPECT_EQ(true, scan.CacheBlocks()); + scan.SetCacheBlocks(false); + EXPECT_EQ(false, scan.CacheBlocks()); + + scan.SetAllowPartialResults(true); + EXPECT_EQ(true, scan.AllowPartialResults()); + scan.SetAllowPartialResults(false); + EXPECT_EQ(false, scan.AllowPartialResults()); + + scan.SetLoadColumnFamiliesOnDemand(true); + EXPECT_EQ(true, scan.LoadColumnFamiliesOnDemand()); + scan.SetLoadColumnFamiliesOnDemand(false); + EXPECT_EQ(false, scan.LoadColumnFamiliesOnDemand()); + + scan.SetMaxVersions(); + EXPECT_EQ(1, scan.MaxVersions()); + scan.SetMaxVersions(20); + EXPECT_EQ(20, scan.MaxVersions()); + + scan.SetBatch(2); + EXPECT_EQ(2, scan.Batch()); + + scan.SetMaxResultSize(1024); + EXPECT_EQ(1024, scan.MaxResultSize()); + + scan.SetMaxResultsPerColumnFamily(256); + EXPECT_EQ(256, scan.MaxResultsPerColumnFamily()); + + + // Test initial values + EXPECT_EQ(0, scan.Timerange().MinTimeStamp()); + EXPECT_EQ(std::numeric_limits::max(), scan.Timerange().MaxTimeStamp()); + + // Set & Test new values using TimeRange and TimeStamp + scan.SetTimeRange(1000, 2000); + EXPECT_EQ(1000, scan.Timerange().MinTimeStamp()); + EXPECT_EQ(2000, scan.Timerange().MaxTimeStamp()); + scan.SetTimeStamp(0); + EXPECT_EQ(0, scan.Timerange().MinTimeStamp()); + EXPECT_EQ(1, scan.Timerange().MaxTimeStamp()); + + // Test some exceptions + ASSERT_THROW(scan.SetTimeRange(-1000, 2000), std::runtime_error); + ASSERT_THROW(scan.SetTimeRange(1000, -2000), std::runtime_error); + ASSERT_THROW(scan.SetTimeRange(1000, 200), std::runtime_error); + ASSERT_THROW(scan.SetTimeStamp(std::numeric_limits::max()), + std::runtime_error); + + scan.SetRowOffsetPerColumnFamily(32); + EXPECT_EQ(32, scan.RowOffsetPerColumnFamily()); + + EXPECT_EQ(false, scan.IsStartRowAndEqualsStopRow()); + start_row = "test-row"; + stop_row = "test-row"; + scan.SetStartRow(start_row); + scan.SetStopRow(stop_row); + EXPECT_EQ(true, scan.IsStartRowAndEqualsStopRow()); + start_row = "start-row"; + stop_row = "stop-row"; + scan.SetStartRow(start_row); + scan.SetStopRow(stop_row); +} + +TEST (Scan, Object) { + Scan(); + Scan scan; + ScanMethods(scan); + EXPECT_EQ(false, scan.IsGetScan()); +} + +TEST (Scan, WithStartRow) { + Scan("row-test"); + Scan scan("row-test"); + ScanMethods(scan); + EXPECT_EQ(false, scan.IsGetScan()); +} + +TEST (Scan, WithStartAndStopRow) { + Scan("start-row", "stop-row"); + Scan scan("start-row", "stop-row"); + ScanMethods(scan); + EXPECT_EQ(false, scan.IsGetScan()); +} + +TEST (Scan,FromGet) { + + std::string row_str = "row-test"; + Get get = Get(row_str); + + get.SetCacheBlocks(true); + get.SetMaxVersions(5); + get.SetMaxResultsPerColumnFamily(1); + get.AddFamily("family-1"); + get.AddColumn("family-1", "column-3"); + + get.SetCacheBlocks(true); + get.SetConsistency(hbase::pb::Consistency::STRONG); + get.SetMaxResultsPerColumnFamily(1); + + get.AddFamily("family-1"); + EXPECT_EQ(1, get.FamilyMap().size()); + for (const auto &family : get.FamilyMap()) { + EXPECT_STREQ("family-1", family.first.c_str()); + EXPECT_EQ(0, family.second.size()); + } + + get.AddColumn("family-1", "column-1"); + get.AddColumn("family-1", "column-2"); + get.AddColumn("family-1", ""); + get.AddColumn("family-1", "column-3"); + EXPECT_EQ(1, get.FamilyMap().size()); + for (const auto &family : get.FamilyMap()) { + EXPECT_STREQ("family-1", family.first.c_str()); + EXPECT_EQ(4, family.second.size()); + EXPECT_STREQ("column-1", family.second[0].c_str()); + EXPECT_STREQ("column-2", family.second[1].c_str()); + EXPECT_STREQ("", family.second[2].c_str()); + EXPECT_STREQ("column-3", family.second[3].c_str()); + } + + get.AddFamily("family-1"); + EXPECT_EQ(1, get.FamilyMap().size()); + get.AddFamily("family-2"); + EXPECT_EQ(2, get.FamilyMap().size()); + get.AddFamily("family-3"); + EXPECT_EQ(3, get.FamilyMap().size()); + int i = 1; + for (const auto &family : get.FamilyMap()) { + std::string family_name = "family-" + std::to_string(i); + EXPECT_STREQ(family_name.c_str(), family.first.c_str()); + EXPECT_EQ(0, family.second.size()); + i += 1; + } + + Scan scan(get); + ScanMethods(scan); + EXPECT_EQ(true, scan.IsGetScan()); +} + +TEST (Scan, Exception) { + std::string row(std::numeric_limits::max() + 1, 'X'); + ASSERT_THROW(Scan tmp(row), std::runtime_error); + ASSERT_THROW(Scan tmp(""), std::runtime_error); +} diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc new file mode 100644 index 0000000..34773df --- /dev/null +++ b/hbase-native-client/core/scan.cc @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/scan.h" + +#include +#include +#include +#include + +namespace hbase { + +Scan::Scan() { +} + +Scan::~Scan() { + if (nullptr != tr_) { + delete tr_; + tr_ = nullptr; + } +} + +Scan::Scan(const std::string &start_row) { + CheckRow(start_row); + start_row_ = start_row; +} + +Scan::Scan(const std::string &start_row, const std::string &stop_row) { + CheckRow(start_row); + CheckRow(stop_row); + start_row_ = start_row; + stop_row_ = stop_row; + get_scan_ = IsStartRowAndEqualsStopRow(); +} + +Scan::Scan(const Get &get) { + cache_blocks_ = get.CacheBlocks(); + max_versions_ = get.MaxVersions(); + store_limit_ = get.MaxResultsPerColumnFamily(); + if (nullptr != tr_) { + delete tr_; + tr_ = nullptr; + } + tr_ = new TimeRange(get.Timerange().MinTimeStamp(), + get.Timerange().MaxTimeStamp()); + get_scan_ = true; + family_map_.insert(get.FamilyMap().begin(), get.FamilyMap().end()); + +} + +Scan::Scan(const Scan &scan) { + start_row_ = scan.start_row_; + stop_row_ = scan.stop_row_; + max_versions_ = scan.max_versions_; + batch_ = scan.batch_; + store_limit_ = scan.store_limit_; + store_offset_ = scan.store_offset_; + caching_ = scan.caching_; + max_result_size_ = scan.max_result_size_; + cache_blocks_ = scan.cache_blocks_; + load_column_families_on_demand_ = scan.load_column_families_on_demand_; + reversed_ = scan.reversed_; + small_ = scan.small_; + allow_partial_results_ = scan.allow_partial_results_; + get_scan_ = scan.IsGetScan(); + consistency_ = scan.consistency_; + if (nullptr != tr_) { + delete tr_; + tr_ = nullptr; + } + tr_ = new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()); + family_map_.insert(scan.family_map_.begin(), scan.family_map_.end()); +} + +Scan& Scan::AddFamily(const std::string &family) { + const auto &it = family_map_.find(family); + /** + * Check if any qualifiers are already present or not. + * Remove all existing qualifiers if the given family is already present in the map + */ + if (family_map_.end() != it) { + it->second.clear(); + } else { + family_map_[family]; + } + return *this; +} + +Scan& Scan::AddColumn(const std::string &family, const std::string &qualifier) { + const auto &it = std::find(family_map_[family].begin(), + family_map_[family].end(), qualifier); + /** + * Check if any qualifiers are already present or not. + * Add only if qualifiers for a given family are not present + */ + if (it == family_map_[family].end()) { + family_map_[family].push_back(qualifier); + } + return *this; +} + +void Scan::SetReversed(bool reversed) { + reversed_ = reversed; +} + +bool Scan::IsReversed() const { + return reversed_; +} + +void Scan::SetStartRow(std::string &start_row) { + CheckRow(start_row); + start_row_ = start_row; +} + +const std::string& Scan::StartRow() const { + return start_row_; +} + +void Scan::SetStopRow(std::string &stop_row) { + CheckRow(stop_row); + stop_row_ = stop_row; +} + +const std::string& Scan::StopRow() const { + return stop_row_; +} + +void Scan::SetSmall(bool small) { + small_ = small; +} + +bool Scan::IsSmall() const { + return small_; +} + +void Scan::SetCaching(int caching) { + caching_ = caching; +} + +int Scan::Caching() const { + return caching_; +} + +Scan& Scan::SetConsistency(const hbase::pb::Consistency consistency) { + consistency_ = consistency; + return *this; +} + +hbase::pb::Consistency Scan::Consistency() const { + return consistency_; +} + +void Scan::SetCacheBlocks(bool cache_blocks) { + cache_blocks_ = cache_blocks; +} + +bool Scan::CacheBlocks() const { + return cache_blocks_; +} + +void Scan::SetAllowPartialResults(bool allow_partial_results) { + allow_partial_results_ = allow_partial_results; +} + +bool Scan::AllowPartialResults() const { + return allow_partial_results_; +} + +void Scan::SetLoadColumnFamiliesOnDemand(bool load_column_families_on_demand) { + load_column_families_on_demand_ = load_column_families_on_demand; +} + +bool Scan::LoadColumnFamiliesOnDemand() const { + return load_column_families_on_demand_; +} + +Scan& Scan::SetMaxVersions(uint32_t max_versions) { + max_versions_ = max_versions; + return *this; +} + +int Scan::MaxVersions() const { + return max_versions_; +} + +Scan& Scan::SetBatch(int batch) { + batch_ = batch; + return *this; +} + +int Scan::Batch() const { + return batch_; +} + +void Scan::SetMaxResultSize(long max_result_size) { + max_result_size_ = max_result_size; +} + +long Scan::MaxResultSize() const { + return max_result_size_; +} + +Scan &Scan::SetMaxResultsPerColumnFamily(int store_limit) { + store_limit_ = store_limit; + return *this; +} +int Scan::MaxResultsPerColumnFamily() const { + return store_limit_; +} + +Scan &Scan::SetTimeRange(long min_stamp, long max_stamp) { + if (nullptr != tr_) { + delete tr_; + tr_ = nullptr; + } + tr_ = new TimeRange(min_stamp, max_stamp); + return *this; +} + +Scan& Scan::SetTimeStamp(long timestamp) { + if (nullptr != tr_) { + delete tr_; + tr_ = nullptr; + } + tr_ = new TimeRange(timestamp, timestamp + 1); + return *this; +} + +const TimeRange& Scan::Timerange() const { + return *tr_; +} + +Scan& Scan::SetRowOffsetPerColumnFamily(int store_offset) { + store_offset_ = store_offset; + return *this; +} + +int Scan::RowOffsetPerColumnFamily() const { + return store_offset_; +} + +bool Scan::IsGetScan() const { + return (get_scan_ || IsStartRowAndEqualsStopRow()); +} + +bool Scan::IsStartRowAndEqualsStopRow() const { + return (((start_row_.size() > 0)) && (start_row_ == stop_row_)) ? true : false; +} + +void Scan::CheckRow(const std::string &row) { + const int kMaxRowLength = std::numeric_limits::max(); + int row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error( + "Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } +} +} +/* namespace hbase */ diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h new file mode 100644 index 0000000..dca1f97 --- /dev/null +++ b/hbase-native-client/core/scan.h @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "core/get.h" +#include "core/time_range.h" +#include "if/Client.pb.h" + +namespace hbase { + +/** + * @brief Map consisting of column families and qualifiers to be used for Get operation + */ +using FamilyMap = std::map>; + +class Scan { + public: + /** + * @brief Create a Scan operation across all rows. + */ + Scan(); + ~Scan(); + + /** + * @brief Create a Scan operation starting at the specified row. If the specified row does not exist, + * the Scanner will start from the next closest row after the specified row. + * @param start_row - row to start scanner at or after + */ + Scan(const std::string &start_row); + + /** + * @brief Create a Scan operation for the range of rows specified. + * @param start_row - row to start scanner at or after (inclusive). + * @param stop_row - row to stop scanner before (exclusive). + */ + Scan(const std::string &start_row, const std::string &stop_row); + + /** + * @brief Builds a scan object with the same specs as get. + * @param get - get to model scan after + */ + Scan(const Get &get); + + /** + * @brief Scan Copy constructor + * @param scan object to be used for creating + */ + Scan(const Scan &scan); + + /** + * @brief Get all columns from the specified family.Overrides previous calls to AddColumn for this family. + * @param family - family name + */ + Scan& AddFamily(const std::string &family); + + /** + * @brief Get the column from the specified family with the specified qualifier.Overrides previous calls to AddFamily for this family. + * @param family - family name. + * @param qualifier - column qualifier. + */ + Scan& AddColumn(const std::string &family, const std::string &qualifier); + + /** + * @brief Set whether this scan is a reversed one. This is false by default which means forward(normal) scan. + * @param reversed - if true, scan will be backward order + */ + void SetReversed(bool reversed); + + /** + * @brief Get whether this scan is a reversed one. Returns true if backward scan, false if forward(default) scan + */ + bool IsReversed() const; + + /** + * @brief Set the start row of the scan.If the specified row does not exist, the Scanner will start from the next closest row after the specified row. + * @param start_row - row to start scanner at or after + * @throws std::runtime_error if start_row length is 0 or greater than MAX_ROW_LENGTH + */ + void SetStartRow(std::string &start_row); + + /** + * @brief returns start_row of the Scan. + */ + const std::string& StartRow() const; + + /** + * @brief Set the stop row of the scan. The scan will include rows that are lexicographically less than the provided stop_row. + * @param stop_row - row to end at (exclusive) + * @throws std::runtime_error if stop_row length is 0 or greater than MAX_ROW_LENGTH + */ + void SetStopRow(std::string &stop_row); + + /** + * @brief returns stop_row of the Scan. + */ + const std::string& StopRow() const; + + /** + * @brief Set whether this scan is a small scan. + */ + void SetSmall(bool small); + + /** + * @brief Returns if the scan is a small scan. + */ + bool IsSmall() const; + + /** + * @brief Set the number of rows for caching that will be passed to scanners. Higher caching values will enable faster scanners but will use more memory. + * @param caching - the number of rows for caching. + */ + void SetCaching(int caching); + + /** + * @brief caching the number of rows fetched when calling next on a scanner. + */ + int Caching() const; + + /** + * @brief Sets the consistency level for this operation. + * @param consistency - the consistency level + */ + Scan& SetConsistency(const hbase::pb::Consistency consistency); + + /** + * @brief Returns the consistency level for this operation. + */ + hbase::pb::Consistency Consistency() const; + + /** + * @brief Set whether blocks should be cached for this Scan.This is true by default. When true, default settings of the table and family are used (this will never override caching blocks if the block cache is disabled for that family or entirely). + * @param cache_blocks - if false, default settings are overridden and blocks will not be cached + */ + void SetCacheBlocks(bool cache_blocks); + + /** + * @brief Get whether blocks should be cached for this Scan. + */ + bool CacheBlocks() const; + + /** + * @brief Setting whether the caller wants to see the partial results that may be returned from the server. By default this value is false and the complete results will be assembled client side before being delivered to the caller. + * @param allow_partial_results - if true partial results will be returned. + */ + void SetAllowPartialResults(bool allow_partial_results); + + /** + * @brief true when the constructor of this scan understands that the results they will see may only represent a partial portion of a row. The entire row would be retrieved by subsequent calls to ResultScanner.next() + */ + bool AllowPartialResults() const; + + /** + * @brief Set the value indicating whether loading CFs on demand should be allowed (cluster default is false). On-demand CF loading doesn't load column families until necessary. + * @param load_column_families_on_demand + */ + void SetLoadColumnFamiliesOnDemand(bool load_column_families_on_demand); + + /** + * @brief Get the raw loadColumnFamiliesOnDemand setting. + */ + bool LoadColumnFamiliesOnDemand() const; + + /** + * @brief Get up to the specified number of versions of each column if specified else get default i.e. one. + * @param max_versions - maximum versions for each column. + */ + Scan &SetMaxVersions(uint32_t max_versions = 1); + + /** + * @brief the max number of versions to fetch + */ + int MaxVersions() const; + + /** + * @brief Set the maximum number of values to return for each call to next(). Callers should be aware that invoking this method with any value is equivalent to calling setAllowPartialResults(boolean) with a value of true; partial results may be returned if this method is called. Use setMaxResultSize(long)} to limit the size of a Scan's Results instead. + * @param batch - the maximum number of values + */ + Scan &SetBatch(int batch); + + /** + * @brief maximum number of values to return for a single call to next(). + */ + int Batch() const; + + /** + * @brief Set the maximum result size. The default is -1; this means that no specific maximum result size will be set for this scan, and the global configured value will be used instead. (Defaults to unlimited). + * @param The maximum result size in bytes. + */ + void SetMaxResultSize(long max_result_size); + + /** + * @brief the maximum result size in bytes. + */ + long MaxResultSize() const; + + /** + * @brief Set the maximum number of values to return per row per Column Family. + * @param limit - the maximum number of values returned / row / CF + */ + Scan &SetMaxResultsPerColumnFamily(int store_limit); + + /** + * @brief maximum number of values to return per row per CF. + */ + int MaxResultsPerColumnFamily() const; + + /** + * @brief Get versions of columns only within the specified timestamp range, [min_stamp, max_stamp). Note, default maximum versions to return is 1. If your time range spans more than one version and you want all versions returned, up the number of versions beyond the default. + * @param min_stamp - minimum timestamp value, inclusive. + * @param max_stamp - maximum timestamp value, exclusive. + */ + Scan &SetTimeRange(long min_stamp, long max_stamp); + + /** + * @brief Get versions of columns with the specified timestamp. Note, default maximum versions to return is 1. If your time range spans more than one version and you want all versions returned, up the number of versions beyond the defaut. + * @param timestamp - version timestamp + */ + Scan &SetTimeStamp(long timestamp); + + /** + * @brief Return Timerange + */ + const TimeRange& Timerange() const; + + /** + * @brief Set offset for the row per Column Family. + * @param offset - is the number of kvs that will be skipped. + */ + Scan& SetRowOffsetPerColumnFamily(int store_offset); + + /** + * @brief Method for retrieving the scan's offset per row per column family (#kvs to be skipped) + */ + int RowOffsetPerColumnFamily() const; + + bool IsGetScan() const; + + /** + * @brief Checks for equality of start and stop row. Returns true when equal false otherwise. + */ + bool IsStartRowAndEqualsStopRow() const; + + private: + std::string start_row_ = ""; + std::string stop_row_ = ""; + uint32_t max_versions_ = 1; + int batch_ = -1; + int store_limit_ = -1; + int store_offset_ = 0; + int caching_ = -1; + long max_result_size_ = -1; + bool cache_blocks_ = true; + bool load_column_families_on_demand_ = false; + bool reversed_ = false; + bool small_ = false; + bool allow_partial_results_ = false; + bool get_scan_ = false; + hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; + TimeRange *tr_ = new TimeRange(); + FamilyMap family_map_; + + /** + * @brief Checks for row length validity, throws if length check fails, returns null otherwise. + * @throws std::runtime_error if row length equals 0 or greater than std::numeric_limits::max(); + */ + void CheckRow(const std::string &row); +}; +} /* namespace hbase */ -- 1.8.3.1