diff --git src/main/java/org/apache/hadoop/hbase/client/Scan.java src/main/java/org/apache/hadoop/hbase/client/Scan.java index 553bde3..29470a6 100644 --- src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -98,7 +98,8 @@ public class Scan extends OperationWithAttributes implements Writable { // define this attribute with the appropriate table name by calling // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)) static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name"; - + /** Scan hints */ + static public final String SCAN_NARROW_ROWS = "_scan_narrow_rows_"; /* * -1 means no caching */ diff --git src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java index e69de29..2fc8bd1 100644 --- src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/ExplicitScanReplacementFilter.java @@ -0,0 +1,153 @@ +/** + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; + +/** + *

+ * This filter is server-side filter and is not meant to be used in a HBase client application. + * It is used in a StoreScanner to replace explicit list of columns in a Scan instance when + * rows to be scanned in a table are small/medium size. + *

+ *

+ * See HBASE-9769 for more information. This filter is not a generic and this explains its naming + * convention - it is the replacement of an explicit column qualifiers list in a StoreScanner + * scan operation. + *

+ *

+ * It keeps list of column qualifiers hashed to a fixed bucket array (similar to HashMap) to improve performance + * with a large column qualifier lists. + *

+ */ +public class ExplicitScanReplacementFilter extends FilterBase { + + + /** Total number of buckets in a bucket array. 256 is OK. Its a good number.*/ + final int buckets = 256; + /** Array of List of size 'buckets'.*/ + Object[] hashedColumnArray = new Object[buckets]; + + /** + * Constructor of a filter. + * @param columns - list of column qualifiers + */ + public ExplicitScanReplacementFilter(NavigableSet columns) + { + buildColumnFamilies(columns); + } + + /** + * Map list of a given column qualifiers into bucket array. + * @param cols - list of column qualifiers. + */ + private void buildColumnFamilies(NavigableSet cols) + { + if(cols != null){ + for(byte[] col: cols){ + hashedColumnAdd(col); + } + } + } + /** + * Add column qualifier to a bucket array. + * @param col - column qualifier + */ + @SuppressWarnings("unchecked") + private void hashedColumnAdd(byte[] col) { + + long hash = hashCode( col, 0, col.length); + int index = (int)(Math.abs(hash) & 0xff); + + List list = (List) hashedColumnArray[index]; + if(list == null){ + list = new ArrayList(); + hashedColumnArray[index] = list; + } + list.add(col); + } + + @SuppressWarnings("unchecked") + @Override + public ReturnCode filterKeyValue(KeyValue v) { + byte[] buf = v.getBuffer(); + int colOffset = v.getQualifierOffset(); + int colLen = v.getQualifierLength(); + + long hash = hashCode(buf, colOffset, colLen); + int index = (int)(Math.abs(hash) & 0xff); + List list = (List) hashedColumnArray[index]; + if(list != null) { + for(byte[] col: list){ + if(Bytes.equals(col, 0, col.length, buf, colOffset, colLen)){ + return ReturnCode.INCLUDE; + } + } + } + return ReturnCode.SKIP; + } + + + /** + * Is not supported in HBase client API + * + */ + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("ExplicitScanReplacementFilter is not supported in a client application"); + } + /** + * Is not supported in HBase client API + * + */ + @Override + public void write(DataOutput arg0) throws IOException { + throw new IOException("ExplicitScanReplacementFilter is not supported in a client application"); + + } + + /** + * Calculates hash function of a given byte sub-array. + * @param qualifierBuffer - byte buffer + * @param qualifierOff - qualifier offset + * @param qualifierLen - qualifier length + * @return + */ + private final long hashCode( + final byte[] buffer, final int offset, final int length) + { + + long h = 0; + int off = offset; + for(int i=0; i < length; i++ ){ + h = 31*h + buffer[off++]; + } + return h; + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 7684821..27d74f4 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -56,6 +56,10 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ExplicitScanReplacementFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HeapSize; @@ -2200,21 +2204,52 @@ public class Store extends SchemaConfigured implements HeapSize { * @throws IOException */ public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols) throws IOException { - lock.readLock().lock(); - try { - KeyValueScanner scanner = null; - if (getHRegion().getCoprocessorHost() != null) { - scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); - } - if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); - } - return scanner; - } finally { - lock.readLock().unlock(); - } - } + final NavigableSet targetCols) throws IOException { + lock.readLock().lock(); + try { + KeyValueScanner scanner = null; + boolean useFilter = doesUseExplicitColFilter(scan); + if (getHRegion().getCoprocessorHost() != null) { + scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, + useFilter? null:targetCols); + } + if (scanner == null) { + scanner = new StoreScanner(this, getScanInfo(), scan, useFilter? null:targetCols); + } + return scanner; + } finally { + lock.readLock().unlock(); + } + } + /** + * Checks if we need replace explicit qualifier list in a scan object for thsi column family + * with a filter. + * @param scan + * @return true - if yes, false - otherwise + */ + private boolean doesUseExplicitColFilter(Scan scan) { + byte[] narrowRows = scan.getAttribute(Scan.SCAN_NARROW_ROWS); + // Hint: if MAX_VERSIONS = 1 - use filter-based Scan + // its hard-coded. Is it good? + int maxVersions = getFamily().getMaxVersions(); + if(maxVersions == 1 || (narrowRows != null && new String(narrowRows).equalsIgnoreCase("true"))){ + NavigableSet cols = scan.getFamilyMap().get(getFamily().getName()); + if(cols != null && ( cols.size() > 1 || cols.first() != null)){ + ExplicitScanReplacementFilter filter = new ExplicitScanReplacementFilter(cols); + Filter f = scan.getFilter(); + if(f != null){ + FilterList fl = new FilterList(Operator.MUST_PASS_ALL, filter, f); + scan.setFilter(fl); + } else{ + scan.setFilter(filter); + } + // Update scan's family map and removes entry fo this columnFamily. Q: is it safe? + scan.getFamilyMap().remove(getFamily().getName()); + return true; + } + } + return false; + } @Override public String toString() {