diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index b876fa7..1973997 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -103,7 +103,8 @@ public class Scan extends OperationWithAttributes { // define this attribute with the appropriate table name by calling // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)) static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name"; - + /** Scan Hints */ + static public final String HINT_NARROW_ROWS = "_hint_narrow_rows_"; /* * -1 means no caching */ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java index e69de29..bb36c8b 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.util.Bytes; + +/** + *

+ * This filter is server-side filter and is not meant to be used in a HBase + * client application. It is used in a StoreScanner to replace explicit list of + * columns in a Scan instance when rows to be scanned in a table are + * small/medium size. + *

+ *

+ * See HBASE-9769 for more information. This filter is not a generic and this + * explains its naming convention - it is the replacement of an explicit column + * qualifiers list in a StoreScanner scan operation. + *

+ *

+ * It keeps list of column qualifiers hashed to a fixed bucket array (similar to + * HashMap) to improve performance with a large column qualifier lists. + *

+ */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ExplicitScanReplacementFilter extends FilterBase { + + /** Total number of buckets in a bucket array. 256 is OK. Its a good number. */ + final int buckets = 256; + /** Array of List of size 'buckets'. */ + Object[] hashedColumnArray = new Object[buckets]; + + /** + * Constructor of a filter. + * @param columns - list of column qualifiers + */ + public ExplicitScanReplacementFilter(NavigableSet columns) { + buildColumnFamilies(columns); + } + + /** + * Map list of a given column qualifiers into bucket array. + * @param cols - list of column qualifiers. + */ + private void buildColumnFamilies(NavigableSet cols) { + if (cols != null) { + for (byte[] col : cols) { + hashedColumnAdd(col); + } + } + } + + /** + * Add column qualifier to a bucket array. + * @param col - column qualifier + */ + @SuppressWarnings("unchecked") + private void hashedColumnAdd(byte[] col) { + + long hash = hashCode(col, 0, col.length); + int index = (int) (Math.abs(hash) & 0xff); + + List list = (List) hashedColumnArray[index]; + if (list == null) { + list = new ArrayList(); + hashedColumnArray[index] = list; + } + list.add(col); + } + + @SuppressWarnings("unchecked") + @Override + public ReturnCode filterKeyValue(Cell v) { + + byte[] buf = v.getQualifierArray(); + int colOffset = v.getQualifierOffset(); + int colLen = v.getQualifierLength(); + + long hash = hashCode(buf, colOffset, colLen); + int index = (int) (Math.abs(hash) & 0xff); + List list = (List) hashedColumnArray[index]; + if (list != null) { + for (byte[] col : list) { + if (Bytes.equals(col, 0, col.length, buf, colOffset, colLen)) { + return ReturnCode.INCLUDE; + } + } + } + return ReturnCode.SKIP; + } + + + + /** + * Calculates hash function of a given byte sub-array. + * @param qualifierBuffer - byte buffer + * @param qualifierOff - qualifier offset + * @param qualifierLen - qualifier length + * @return hash code of a sub-array. + */ + private final long hashCode(final byte[] buffer, final int offset, + final int length) { + + long h = 0; + for (int i = 0; i < length; i++) { + h = 31 * h + buffer[offset + i]; + } + return h; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitScanReplacementFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitScanReplacementFilter.java deleted file mode 100644 index e69de29..0000000 diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 6e7f936..82b4079 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -55,6 +55,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ExplicitScanReplacementFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -1648,15 +1652,18 @@ public class HStore implements Store { @Override public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols) throws IOException { + final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { KeyValueScanner scanner = null; - if (this.getCoprocessorHost() != null) { - scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); + boolean useFilter = shouldUseExplicitColumnFilter(scan); + if (getHRegion().getCoprocessorHost() != null) { + scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, + scan, useFilter ? null : targetCols); } if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + scanner = new StoreScanner(this, getScanInfo(), scan, useFilter ? null + : targetCols); } return scanner; } finally { @@ -1664,6 +1671,37 @@ public class HStore implements Store { } } + /** + * Checks if we need replace explicit qualifier list in a scan object for this + * column family with a filter. + * + * @param scan + * @return true - if yes, false - otherwise + */ + private boolean shouldUseExplicitColumnFilter(Scan scan) { + byte[] narrowRows = scan.getAttribute(Scan.HINT_NARROW_ROWS); + if (narrowRows != null && new String(narrowRows).equalsIgnoreCase("true")) { + NavigableSet cols = scan.getFamilyMap().get(getFamily().getName()); + if (cols != null && (cols.first() != null)) { + ExplicitScanReplacementFilter colFilter = new ExplicitScanReplacementFilter( + cols); + Filter f = scan.getFilter(); + if (f != null) { + FilterList fl = new FilterList(Operator.MUST_PASS_ALL, colFilter, f); + scan.setFilter(fl); + } else { + scan.setFilter(colFilter); + } + // Update scan's family map and removes entry fo this columnFamily. Q: + // is it safe? + scan.getFamilyMap().remove(getFamily().getName()); + return true; + } + } + return false; + } + + @Override public String toString() { return this.getColumnFamilyName(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 5ff6575..07eb339 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1278,7 +1278,7 @@ public class HLogSplitter { */ private abstract static class SinkWriter { /* Count of edits written to this path */ - long editsWritten = 0; + public long editsWritten = 0; /* Number of nanos spent writing to this log */ long nanosSpent = 0; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestExplicitScanReplacementFilter.java hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestExplicitScanReplacementFilter.java index e69de29..72e37f9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestExplicitScanReplacementFilter.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestExplicitScanReplacementFilter.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.NavigableSet; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.filter.ExplicitScanReplacementFilter; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestExplicitScanReplacementFilter { + + private static byte[] ROW = "row".getBytes(); + private static byte[] CF = "cf".getBytes(); + private static byte[] VALUE = "value".getBytes(); + private static byte[][] CQs = new byte[][]{ + "cq1".getBytes(),"cq2".getBytes(), "cq3".getBytes(), "cq4".getBytes(), "cq5".getBytes(), + "cq6".getBytes(),"cq7".getBytes(), "cq8".getBytes(), "cq9".getBytes(), "cq10".getBytes() + }; + + @Test + public void testExplicitScanReplacementFilter() throws IOException { + for(int i= 0; i < CQs.length; i++){ + testWith(i); + } + } + + private void testWith(int n){ + NavigableSet cols = new TreeSet(Bytes.BYTES_COMPARATOR); + for(int i=0; i < n; i++) cols.add(CQs[i]); + ExplicitScanReplacementFilter filter = new ExplicitScanReplacementFilter(cols); + for(int i =0; i < CQs.length; i++){ + KeyValue kv = new KeyValue(ROW, CF, CQs[i],VALUE); + ReturnCode code = filter.filterKeyValue(kv); + if(i < n ){ + assertEquals(ReturnCode.INCLUDE, code); + }else{ + assertEquals(ReturnCode.SKIP, code); + } + } + } + +} + diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitScanReplacementFilter.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitScanReplacementFilter.java deleted file mode 100644 index e69de29..0000000