Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java (revision 0) @@ -0,0 +1,242 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static junit.framework.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +/* + * This test verifies that the scenarios illustrated by HBASE-10850 work + * w.r.t. essential column family optimization + */ +public class TestSCVFWithMiniCluster { + private static final String HBASE_TABLE_NAME = "TestSCVFWithMiniCluster"; + + private static final byte[] FAMILY_A = Bytes.toBytes("a"); + private static final byte[] FAMILY_B = Bytes.toBytes("b"); + + private static final byte[] QUALIFIER_FOO = Bytes.toBytes("foo"); + private static final byte[] QUALIFIER_BAR = Bytes.toBytes("bar"); + + private HTable htable; + + private Filter scanFilter; + + private int expected = 1; + + @Before + public void setUp() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + + util.startMiniCluster(1); + + HBaseAdmin admin = util.getHBaseAdmin(); + destroy(admin, HBASE_TABLE_NAME); + create(admin, HBASE_TABLE_NAME, FAMILY_A, FAMILY_B); + admin.close(); + htable = new HTable(util.getConfiguration(), HBASE_TABLE_NAME); + + /* Add some values */ + List puts = new ArrayList(); + + /* Add a row with 'a:foo' = false */ + Put put = new Put(Bytes.toBytes("1")); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_A, QUALIFIER_FOO, Bytes.toBytes("false")); + put.add(FAMILY_A, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_FOO, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + puts.add(put); + + /* Add a row with 'a:foo' = true */ + put = new Put(Bytes.toBytes("2")); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_A, QUALIFIER_FOO, Bytes.toBytes("true")); + put.add(FAMILY_A, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_FOO, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + puts.add(put); + + /* Add a row with 'a:foo' qualifier not set */ + put = new Put(Bytes.toBytes("3")); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_A, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_FOO, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + puts.add(put); + + htable.put(puts); + /* + * We want to filter out from the scan all rows that do not have the column 'a:foo' with value + * 'false'. Only row with key '1' should be returned in the scan. + */ + scanFilter = new SingleColumnValueFilter(FAMILY_A, QUALIFIER_FOO, CompareOp.EQUAL, + new BinaryComparator(Bytes.toBytes("false"))); + ((SingleColumnValueFilter) scanFilter).setFilterIfMissing(true); + } + @After + public void tearDown() throws Exception { + htable.close(); + } + + private void verify(Scan scan) throws IOException { + ResultScanner scanner = htable.getScanner(scan); + Iterator it = scanner.iterator(); + + /* Then */ + int count = 0; + try { + while (it.hasNext()) { + it.next(); + count++; + } + } finally { + scanner.close(); + } + assertEquals(expected, count); + } + /** + * Test the filter by adding all columns of family A in the scan. (OK) + */ + @Test + public void scanWithAllQualifiersOfFamiliyA() throws IOException { + /* Given */ + Scan scan = new Scan(); + scan.addFamily(FAMILY_A); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding all columns of family A and B in the scan. (KO: row '3' without + * 'a:foo' qualifier is returned) + */ + @Test + public void scanWithAllQualifiersOfBothFamilies() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding 2 columns of family A and 1 column of family B in the scan. (KO: row + * '3' without 'a:foo' qualifier is returned) + */ + @Test + public void scanWithSpecificQualifiers1() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.addColumn(FAMILY_A, QUALIFIER_FOO); + scan.addColumn(FAMILY_A, QUALIFIER_BAR); + scan.addColumn(FAMILY_B, QUALIFIER_BAR); + scan.addColumn(FAMILY_B, QUALIFIER_FOO); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding 1 column of family A (the one used in the filter) and 1 column of + * family B in the scan. (OK) + */ + @Test + public void scanWithSpecificQualifiers2() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.addColumn(FAMILY_A, QUALIFIER_FOO); + scan.addColumn(FAMILY_B, QUALIFIER_BAR); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding 2 columns of family A in the scan. (OK) + */ + @Test + public void scanWithSpecificQualifiers3() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.addColumn(FAMILY_A, QUALIFIER_FOO); + scan.addColumn(FAMILY_A, QUALIFIER_BAR); + scan.setFilter(scanFilter); + + verify(scan); + } + + private static void create(HBaseAdmin admin, String tableName, byte[]... families) + throws IOException { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : families) { + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setMaxVersions(1); + colDesc.setCompressionType(Algorithm.GZ); + desc.addFamily(colDesc); + } + try { + admin.createTable(desc); + } catch (TableExistsException tee) { + /* Ignore */ + } + } + + private static void destroy(HBaseAdmin admin, String tableName) throws IOException { + try { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } catch (TableNotFoundException tnfe) { + /* Ignore */ + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1584255) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3636,7 +3636,7 @@ // KeyValue indicating that limit is reached when scanning private final KeyValue KV_LIMIT = new KeyValue(); protected final byte[] stopRow; - private final Filter filter; + private final FilterWrapper filter; private int batch; protected int isScan; private boolean filterClosed = false; @@ -3916,14 +3916,15 @@ isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength()); // save that the row was empty before filters applied to it. final boolean isEmptyRow = results.isEmpty(); - + // We have the part of the row necessary for filtering (all of it, usually). // First filter with the filterRow(List). + FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (filter != null && filter.hasFilterRow()) { - filter.filterRowCells(results); + ret = filter.filterRowCellsWithRet(results); } - if (isEmptyRow || filterRow()) { + if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { results.clear(); boolean moreRows = nextRow(currentRow, offset, length); if (!moreRows) return false; Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (revision 1584255) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (working copy) @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; -import org.apache.zookeeper.KeeperException.UnimplementedException; import com.google.protobuf.InvalidProtocolBufferException; @@ -154,15 +153,29 @@ @Override public void filterRowCells(List kvs) throws IOException { + filterRowCellsWithRet(kvs); + } + + public enum FilterRowRetCode { + NOT_CALLED, + INCLUDE, // corresponds to filter.filterRow() returning false + EXCLUDE // corresponds to filter.filterRow() returning true + } + public FilterRowRetCode filterRowCellsWithRet(List kvs) throws IOException { //To fix HBASE-6429, //Filter with filterRow() returning true is incompatible with scan with limit //1. hasFilterRow() returns true, if either filterRow() or filterRow(kvs) is implemented. //2. filterRow() is merged with filterRow(kvs), //so that to make all those row related filtering stuff in the same function. this.filter.filterRowCells(kvs); - if (!kvs.isEmpty() && this.filter.filterRow()) { - kvs.clear(); + if (!kvs.isEmpty()) { + if (this.filter.filterRow()) { + kvs.clear(); + return FilterRowRetCode.EXCLUDE; + } + return FilterRowRetCode.INCLUDE; } + return FilterRowRetCode.NOT_CALLED; } /**