diff --git hbase-native-client/core/BUCK hbase-native-client/core/BUCK index d8d15a9..ce0c733 100644 --- hbase-native-client/core/BUCK +++ hbase-native-client/core/BUCK @@ -22,6 +22,8 @@ cxx_library( "client.h", "cell.h", "hbase_macros.h", + "filter.h", + "query.h", "keyvalue-codec.h", "region-location.h", "location-cache.h", @@ -78,6 +80,16 @@ cxx_test( deps=[":core",], run_test_separately=True,) cxx_test( + name="filter-test", + srcs=["filter-test.cc",], + deps=[ + ":core", + "//if:if", + "//serde:serde", + "//test-util:test-util", + ], + run_test_separately=True,) +cxx_test( name="get-test", srcs=["get-test.cc",], deps=[":core",], diff --git hbase-native-client/core/filter-test.cc hbase-native-client/core/filter-test.cc new file mode 100644 index 0000000..a4551e4 --- /dev/null +++ hbase-native-client/core/filter-test.cc @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "core/client.h" +#include "core/configuration.h" +#include "core/get.h" +#include "core/result.h" +#include "core/table.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" + +using hbase::Configuration; +using hbase::Get; +using hbase::FilterFactory; +using hbase::Table; +using hbase::TestUtil; + +TEST(Client, GetWithColumnPrefixFilter) { + // Using TestUtil to populate test data + std::unique_ptr test_util = std::make_unique(); + + // write row1 with 3 columns (column_1, column_2, and foo_column + test_util->RunShellCmd( + "create 't', 'd'; put 't', 'row1', 'd:column_1', 'value1'; put 't', 'row1', 'd:column_2', " + "'value2'; put 't', 'row1', 'd:foo_column', 'value3'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to("t"); + auto row = "row1"; + + // Gets to be performed on above HBase Table + Get get_all(row); // expected to return all 3 columns + Get get_one(row); // expected to return 1 column + Get get_two(row); // expected to return 2 column + + get_one.SetFilter(FilterFactory::ColumnPrefixFilter("foo_")); + get_two.SetFilter(FilterFactory::ColumnPrefixFilter("column_")); + + // Create a client + hbase::Client client(Configuration{}); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform the Get + auto result_all = table->Get(get_all); + auto result_one = table->Get(get_one); + auto result_two = table->Get(get_two); + + table->Close(); + client.Close(); + + // Stopping the connection as we are getting segfault due to some folly issue + // The connection stays open and we don't want that. + test_util.release(); + + // Test the values + ASSERT_TRUE(!result_one->IsEmpty()) << "Result shouldn't be empty."; + ASSERT_TRUE(!result_two->IsEmpty()) << "Result shouldn't be empty."; + ASSERT_TRUE(!result_all->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result_one->Row()); + EXPECT_EQ(row, result_two->Row()); + EXPECT_EQ(row, result_all->Row()); + EXPECT_EQ(1, result_one->Size()); + EXPECT_EQ(2, result_two->Size()); + EXPECT_EQ(3, result_all->Size()); + EXPECT_EQ("value3", *(result_one->Value("d", "foo_column"))); + EXPECT_EQ("value1", *(result_two->Value("d", "column_1"))); + EXPECT_EQ("value2", *(result_two->Value("d", "column_2"))); +} diff --git hbase-native-client/core/filter.h hbase-native-client/core/filter.h new file mode 100644 index 0000000..3649a17 --- /dev/null +++ hbase-native-client/core/filter.h @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +#include "if/Filter.pb.h" + +using google::protobuf::Message; + +namespace hbase { + +/** + * In C++ Client, Filter is a thin wrapper for calling filters defined as a Java class. The actual + * filtering logic is not implemented here, but this class provides a mechanism to call + * pre-existing Filter classes (like KeyOnlyFilter, SingleColumnValueFilter, etc) with your Get or + * Scan RPCs. This class can also be used to call custom Filters defined as a Java class, or + * pre-existing Filters not defined below. + * + * Consult the Java class docs for learning about the various filters and how they work (and filter + * arguments). + * + * Pre-existing Filters can be used like this: + * + * Get get(row); + * get.SetFilter(FilterFactory::ColumnPrefixFilter("foo_")); + * + * Custom filters can be invoked like this: + * Get get(row); + * std::string filter_java_class_name = "foo.bar.baz"; + * auto filter_data = std::make_unique(); + * filter_data->set_foo(foo); + * get.SetFilter(std::make_unique(filter_java_class_name, filter_data)); + * + */ +class Filter { + public: + Filter(std::string java_class_name, std::unique_ptr data) + : java_class_name_(java_class_name), data_(std::move(data)) {} + virtual ~Filter() {} + + const std::string java_class_name() const { return java_class_name_; } + + /** + * Serialize the filter data to the given buffer. Does protobuf encoding by default. + * Can be overriden if Filter does not use protobuf. + */ + virtual void Serialize(std::string* buf) const { + if (data_ != nullptr) { + data_->SerializeToString(buf); + } + } + + /** Internal method */ + static std::unique_ptr ToProto(const Filter& filter) { + auto pb_filter = std::make_unique(); + pb_filter->set_name(filter.java_class_name()); + filter.Serialize(pb_filter->mutable_serialized_filter()); + return std::move(pb_filter); + } + + private: + std::unique_ptr data_; + std::string java_class_name_; +}; + +/** + * Used in row range filters + */ +struct RowRange { + std::string start_row; + bool start_row_inclusive; + std::string stop_row; + bool stop_row_inclusive; +}; + +/** + * Factory for creating pre-defined filters. + */ +class FilterFactory { + public: + static std::unique_ptr ColumnCountGetFilter(uint32_t limit) noexcept { + auto data = std::make_unique(); + data->set_limit(limit); + return std::make_unique("org.apache.hadoop.hbase.filter.ColumnCountGetFilter", + std::move(data)); + } + + static std::unique_ptr ColumnPaginationFilter(uint32_t limit, uint32_t offset) noexcept { + auto data = std::make_unique(); + data->set_limit(limit); + data->set_offset(offset); + return std::make_unique("org.apache.hadoop.hbase.filter.ColumnPaginationFilter", + std::move(data)); + } + + static std::unique_ptr ColumnPaginationFilter(uint32_t limit, + const std::string& column_offset) noexcept { + auto data = std::make_unique(); + data->set_limit(limit); + data->set_column_offset(column_offset); + return std::make_unique("org.apache.hadoop.hbase.filter.ColumnPaginationFilter", + std::move(data)); + } + + static std::unique_ptr ColumnPrefixFilter(const std::string& prefix) noexcept { + auto data = std::make_unique(); + data->set_prefix(prefix); + return std::make_unique("org.apache.hadoop.hbase.filter.ColumnPrefixFilter", + std::move(data)); + } + + static std::unique_ptr ColumnRangeFilter(const std::string& min_column, + bool min_column_inclusive, + const std::string& max_column, + bool max_column_inclusive) noexcept { + auto data = std::make_unique(); + data->set_min_column(min_column); + data->set_min_column_inclusive(min_column_inclusive); + data->set_max_column(max_column); + data->set_max_column_inclusive(max_column_inclusive); + return std::make_unique("org.apache.hadoop.hbase.filter.ColumnRangeFilter", + std::move(data)); + } + + static std::unique_ptr FilterAllFilter() noexcept { + auto data = std::make_unique(); + return std::make_unique("org.apache.hadoop.hbase.filter.FilterAllFilter", + std::move(data)); + } + + static std::unique_ptr FirstKeyOnlyFilter() noexcept { + auto data = std::make_unique(); + return std::make_unique("org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter", + std::move(data)); + } + + static std::unique_ptr FirstKeyValueMatchingQualifiersFilter( + const std::set& qualifiers) noexcept { + auto data = std::make_unique(); + for (auto q : qualifiers) { + data->add_qualifiers(q); + } + return std::make_unique( + "org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter", std::move(data)); + } + + static std::unique_ptr FuzzyRowFilter( + const std::vector>& fuzzy_keys_data) noexcept { + auto data = std::make_unique(); + for (auto q : fuzzy_keys_data) { + auto p = data->add_fuzzy_keys_data(); + p->set_first(q.first); + p->set_second(q.second); + } + return std::make_unique("org.apache.hadoop.hbase.filter.FuzzyRowFilter", + std::move(data)); + } + + static std::unique_ptr InclusiveStopFilter(const std::string& stop_row_key) noexcept { + auto data = std::make_unique(); + data->set_stop_row_key(stop_row_key); + return std::make_unique("org.apache.hadoop.hbase.filter.InclusiveStopFilter", + std::move(data)); + } + + static std::unique_ptr KeyOnlyFilter(bool len_as_val) noexcept { + auto data = std::make_unique(); + data->set_len_as_val(len_as_val); + return std::make_unique("org.apache.hadoop.hbase.filter.KeyOnlyFilter", + std::move(data)); + } + + static std::unique_ptr MultipleColumnPrefixFilter( + const std::vector& sorted_prefixes) noexcept { + auto data = std::make_unique(); + for (auto p : sorted_prefixes) { + data->add_sorted_prefixes(p); + } + return std::make_unique("org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter", + std::move(data)); + } + + static std::unique_ptr MultiRowRangeFilter( + const std::vector& row_ranges) noexcept { + auto data = std::make_unique(); + for (auto r : row_ranges) { + auto range = data->add_row_range_list(); + range->set_start_row(r.start_row); + range->set_start_row_inclusive(r.start_row_inclusive); + range->set_stop_row(r.stop_row); + range->set_stop_row_inclusive(r.stop_row_inclusive); + } + return std::make_unique("org.apache.hadoop.hbase.filter.MultiRowRangeFilter", + std::move(data)); + } + + static std::unique_ptr PageFilter(uint64_t page_size) noexcept { + auto data = std::make_unique(); + data->set_page_size(page_size); + return std::make_unique("org.apache.hadoop.hbase.filter.PageFilter", std::move(data)); + } + + static std::unique_ptr PrefixFilter(const std::string& prefix) noexcept { + auto data = std::make_unique(); + data->set_prefix(prefix); + return std::make_unique("org.apache.hadoop.hbase.filter.PrefixFilter", std::move(data)); + } + + static std::unique_ptr RandomRowFilter(float chance) noexcept { + auto data = std::make_unique(); + data->set_chance(chance); + return std::make_unique("org.apache.hadoop.hbase.filter.RandomRowFilter", + std::move(data)); + } + + /** + static std::unique_ptr SingleColumnValueExcludeFilter(float chance) noexcept { + auto data = std::make_unique(); + // TODO + return std::make_unique("org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter", + std::move(data)); + } + + static std::unique_ptr SingleColumnValueFilter(float chance) noexcept { + auto data = std::make_unique(); + data->set_chance(chance); + return std::make_unique("org.apache.hadoop.hbase.filter.SingleColumnValueFilter", + std::move(data)); + } + + + message SingleColumnValueExcludeFilter { + required SingleColumnValueFilter single_column_value_filter = 1; + } + + message SingleColumnValueFilter { + optional bytes column_family = 1; + optional bytes column_qualifier = 2; + required CompareType compare_op = 3; + required Comparator comparator = 4; + optional bool filter_if_missing = 5; + optional bool latest_version_only = 6; + } + */ + + static std::unique_ptr SkipFilter(const Filter& filter) noexcept { + auto data = std::make_unique(); + data->set_allocated_filter(Filter::ToProto(filter).release()); + return std::make_unique("org.apache.hadoop.hbase.filter.SkipFilter", std::move(data)); + } + + static std::unique_ptr TimestampsFilter(std::vector timestamps, + bool can_hint) noexcept { + auto data = std::make_unique(); + for (auto t : timestamps) { + data->add_timestamps(t); + } + data->set_can_hint(can_hint); + return std::make_unique("org.apache.hadoop.hbase.filter.TimestampsFilter", + std::move(data)); + } + + static std::unique_ptr WhileMatchFilter(const Filter& filter) noexcept { + auto data = std::make_unique(); + data->set_allocated_filter(Filter::ToProto(filter).release()); + return std::make_unique("org.apache.hadoop.hbase.filter.WhileMatchFilter", + std::move(data)); + } +}; + +/** +// TODO +message CompareFilter { + required CompareType compare_op = 1; + optional Comparator comparator = 2; +} + +message DependentColumnFilter { + required CompareFilter compare_filter = 1; + optional bytes column_family = 2; + optional bytes column_qualifier = 3; + optional bool drop_dependent_column = 4; +} + +message FamilyFilter { + required CompareFilter compare_filter = 1; +} + +message FilterList { + required Operator operator = 1; + repeated Filter filters = 2; + + enum Operator { + MUST_PASS_ALL = 1; + MUST_PASS_ONE = 2; + } +} + +message QualifierFilter { + required CompareFilter compare_filter = 1; +} + +message RowFilter { + required CompareFilter compare_filter = 1; +} + +message ValueFilter { + required CompareFilter compare_filter = 1; +} + +*/ + +} // namespace hbase diff --git hbase-native-client/core/get.h hbase-native-client/core/get.h index f79c633..92d1fee 100644 --- hbase-native-client/core/get.h +++ hbase-native-client/core/get.h @@ -24,6 +24,7 @@ #include #include #include +#include "core/query.h" #include "core/time_range.h" #include "if/Client.pb.h" @@ -35,7 +36,7 @@ namespace hbase { */ using FamilyMap = std::map>; -class Get { +class Get : public Query { public: /** * Constructors diff --git hbase-native-client/core/query.h hbase-native-client/core/query.h new file mode 100644 index 0000000..b706303 --- /dev/null +++ hbase-native-client/core/query.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "core/filter.h" + +namespace hbase { + +/** + * Base class for read RPC calls (Get / Scan). + */ +class Query { + public: + virtual ~Query() {} + + void SetFilter(std::unique_ptr filter) { filter_ = std::move(filter); } + + const std::unique_ptr& filter() const { return filter_; } + + protected: + std::unique_ptr filter_ = nullptr; +}; + +} // namespace hbase diff --git hbase-native-client/core/request_converter.cc hbase-native-client/core/request_converter.cc index eba07df..149202e 100644 --- hbase-native-client/core/request_converter.cc +++ hbase-native-client/core/request_converter.cc @@ -68,6 +68,10 @@ std::unique_ptr RequestConverter::ToGetRequest(const Get &get, } } + if (get.filter() != nullptr) { + pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); + } + return pb_req; } @@ -108,6 +112,10 @@ std::unique_ptr RequestConverter::ToScanRequest(const Scan &scan, } } + if (scan.filter() != nullptr) { + pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release()); + } + // TODO We will change this later. pb_msg->set_client_handles_partials(false); pb_msg->set_client_handles_heartbeats(false); diff --git hbase-native-client/core/scan.h hbase-native-client/core/scan.h index e2e7f1a..7e8c7bd 100644 --- hbase-native-client/core/scan.h +++ hbase-native-client/core/scan.h @@ -36,7 +36,7 @@ namespace hbase { */ using FamilyMap = std::map>; -class Scan { +class Scan : public Query { public: /** * @brief Constructors. Create a Scan operation across all rows.