### Eclipse Workspace Patch 1.0 #P hbase-server Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1390274) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -1519,6 +1521,297 @@ } /** + * + * @param kvs + * @param filter + * @return passby the real filterRow + * @throws IOException + */ + public boolean preFilterRow(final List kvs, final Filter filter) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ( (RegionObserver) env.getInstance() ) + .preFilterRow(ctx, kvs, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + /** + * + * @param kvs + * @param filter + * @throws IOException + */ + public void postFilterRow(final List kvs, final Filter filter) + throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ( (RegionObserver) env.getInstance() ) + .postFilterRow(ctx, kvs, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** + * + * @param raw + * @param returnedKV + * @param filter + * @return bypass the real transform + * @throws IOException + */ + public boolean preFilterTransform(KeyValue raw, + final KeyValue[] returnedKV, final Filter filter) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + returnedKV[0] = ( (RegionObserver) env.getInstance() ) + .preFilterTransform(ctx, raw, returnedKV[0], filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + /** + * + * @param raw + * @param result + * @param filter + * @return the transformed KeyValue + * @throws IOException + */ + public KeyValue postFilterTransform(KeyValue raw, KeyValue result, + final Filter filter) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + result = ( (RegionObserver) env.getInstance() ) + .postFilterTransform(ctx, raw, result, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return result; + } + + /** + * + * @param raw + * @param returnedKV + * @param filter + * @return bypass the real getNextKeyHint + * @throws IOException + */ + public boolean preFilterGetNextKeyHint(KeyValue raw, + final KeyValue[] returnedKV, final Filter filter) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + returnedKV[0] = ((RegionObserver) env.getInstance()).preFilterGetNextKeyHint(ctx, + raw, returnedKV[0], filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + /** + * + * @param raw + * @param result + * @param filter + * @return the hint + * @throws IOException + */ + public KeyValue postFilterGetNextKeyHint(KeyValue raw, KeyValue result, + final Filter filter) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + result = ( (RegionObserver) env.getInstance() ) + .postFilterGetNextKeyHint(ctx, raw, result, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return result; + } + + /** + * + * @param buffer + * @param offset + * @param length + * @param result + * @param filter + * @return null for not by pass the real filterRowKey, otherwise for bypassing it. + * @throws IOException + */ + public Boolean preFilterRowKey(final byte[] buffer, final int offset, + final int length, boolean result, final Filter filter) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + result = ( (RegionObserver) env.getInstance() ).preFilterRowKey(ctx, buffer, + offset, length, result, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** + * + * @param buffer + * @param offset + * @param length + * @param result + * @param filter + * @return + * @throws IOException + */ + public boolean postFilterRowKey(final byte[] buffer, final int offset, + final int length, boolean result, final Filter filter) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + result = ( (RegionObserver) env.getInstance() ).postFilterRowKey(ctx, + buffer, offset, length, result, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return result; + } + + /** + * + * @param kv + * @param result + * @param filter + * @return + * @throws IOException + */ + public ReturnCode preFilterKeyValue(KeyValue kv, ReturnCode result, + final Filter filter) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + result = ( (RegionObserver) env.getInstance() ).preFilterKeyValue(ctx, + kv, result, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** + * + * @param kv + * @param result + * @param filter + * @return + * @throws IOException + */ + public ReturnCode postFilterKeyValue(final KeyValue kv, ReturnCode result, + final Filter filter) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + result = ( (RegionObserver) env.getInstance() ).postFilterKeyValue(ctx, + kv, result, filter); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return result; + } + + /** * @param familyPaths pairs of { CF, file path } submitted for bulk load * @return true if the default operation should be bypassed * @throws IOException Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1390274) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1715,6 +1715,12 @@ scan.addFamily(family); } } + // set the instance of FilterWrapper in the server side + if (scan.hasFilter() && !(scan.getFilter() instanceof FilterWrapper)) { + FilterWrapper filter = new FilterWrapper(scan.getFilter()); + filter.setCoprocessorHost(coprocessorHost); + scan.setFilter(filter); + } } protected RegionScanner getScanner(Scan scan, @@ -3348,14 +3354,8 @@ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); - this.maxResultSize = scan.getMaxResultSize(); - if (scan.hasFilter()) { - this.filter = new FilterWrapper(scan.getFilter()); - } else { - this.filter = null; - } - + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1390274) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -335,6 +337,73 @@ } @Override + public void preFilterRow( + final ObserverContext env, List kvs, + final Filter filter) throws IOException { + } + + @Override + public void postFilterRow(final ObserverContext env, + List kvs, final Filter filter) throws IOException { + } + + @Override + public KeyValue preFilterTransform( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + return result; + } + + @Override + public KeyValue postFilterTransform( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + return result; + } + + @Override + public KeyValue preFilterGetNextKeyHint( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + return result; + } + + @Override + public KeyValue postFilterGetNextKeyHint( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + return result; + } + + @Override + public boolean preFilterRowKey( + final ObserverContext env, final byte[] buffer, + final int offset, final int length, boolean result, final Filter filter) throws IOException { + return result; + } + + @Override + public boolean postFilterRowKey( + final ObserverContext env, final byte[] buffer, + final int offset, final int length, boolean result, final Filter filter) throws IOException { + return result; + } + + @Override + public ReturnCode preFilterKeyValue( + final ObserverContext env, KeyValue kv, + ReturnCode result, final Filter filter) throws IOException { + return result; + } + + @Override + public ReturnCode postFilterKeyValue( + final ObserverContext env, KeyValue kv, + ReturnCode result, final Filter filter) throws IOException { + return result; + } + + @Override public void preBulkLoadHFile(final ObserverContext ctx, List> familyPaths) throws IOException { } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (revision 1390274) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -28,7 +29,11 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,8 +46,10 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -53,6 +60,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; @@ -153,6 +162,217 @@ table.close(); } + class StringRange { + private String start = null; + private String end = null; + private boolean startInclusive = true; + private boolean endInclusive = false; + + public StringRange(String start, boolean startInclusive, String end, + boolean endInclusive) { + this.start = start; + this.startInclusive = startInclusive; + this.end = end; + this.endInclusive = endInclusive; + } + + public String getStart() { + return this.start; + } + + public String getEnd() { + return this.end; + } + + public boolean isStartInclusive() { + return this.startInclusive; + } + + public boolean isEndInclusive() { + return this.endInclusive; + } + + @Override + public int hashCode() { + int hashCode = 0; + if (this.start != null) { + hashCode ^= this.start.hashCode(); + } + + if (this.end != null) { + hashCode ^= this.end.hashCode(); + } + return hashCode; + } + + @Override + public String toString() { + String result = (this.startInclusive ? "[" : "(") + + (this.start == null ? null : this.start) + ", " + + (this.end == null ? null : this.end) + + (this.endInclusive ? "]" : ")"); + return result; + } + + public boolean inRange(String value) { + boolean afterStart = true; + if (this.start != null) { + int startCmp = value.compareTo(this.start); + afterStart = this.startInclusive ? startCmp >= 0 : startCmp > 0; + } + + boolean beforeEnd = true; + if (this.end != null) { + int endCmp = value.compareTo(this.end); + beforeEnd = this.endInclusive ? endCmp <= 0 : endCmp < 0; + } + + return afterStart && beforeEnd; + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + } + + List generateRandomWords(int numberOfWords, int maxLengthOfWords) { + Set wordSet = new HashSet(); + for (int i = 0; i < numberOfWords; i++) { + int lengthOfWords = (int) (Math.random() * maxLengthOfWords) + 1; + char[] wordChar = new char[lengthOfWords]; + for (int j = 0; j < wordChar.length; j++) { + wordChar[j] = (char) (Math.random() * 26 + 97); + } + String word = new String(wordChar); + wordSet.add(word); + } + List wordList = new ArrayList(wordSet); + return wordList; + } + + private void encryptTable (byte[] table, boolean enable) throws TableNotFoundException, IOException { + // it's the flag which can be used in the CoProcessor interface + HTableDescriptor htd = util.getHBaseAdmin().getTableDescriptor(table); + htd.setValue(SimpleRegionObserver.HTD_KEY_NEED_ENCRYT_VALUE, String.valueOf(enable)); + util.getHBaseAdmin().disableTable(table); + util.getHBaseAdmin().modifyTable(table, htd); + util.getHBaseAdmin().enableTable(table); + } + + public void testScanFilterWithCoProcessor (boolean encrypt, boolean withFilter) throws Exception { + String family = "Family"; + byte[] table = Bytes.toBytes("TestColumnRangeFilterClient"); + HTable ht = util.createTable(table, + Bytes.toBytes(family), Integer.MAX_VALUE); + + encryptTable (table, encrypt); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"encrypted"}, + table, + new Boolean[] {encrypt} + ); + + List rows = generateRandomWords(10, 8); + long maxTimestamp = 2; + List columns = generateRandomWords(20000, 8); + + List kvList = new ArrayList(); + + Map> rangeMap = new HashMap>(); + + rangeMap.put(new StringRange(null, true, "b", false), + new ArrayList()); + rangeMap.put(new StringRange("p", true, "q", false), + new ArrayList()); + rangeMap.put(new StringRange("r", false, "s", true), + new ArrayList()); + rangeMap.put(new StringRange("z", false, null, false), + new ArrayList()); + String valueString = "ValueString"; + + for (String row : rows) { + Put p = new Put(Bytes.toBytes(row)); + p.setWriteToWAL(false); + for (String column : columns) { + for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) { + KeyValue kv = KeyValueTestUtil.create(row, family, column, timestamp, + valueString); + p.add(kv); + kvList.add(kv); + for (StringRange s : rangeMap.keySet()) { + if (withFilter) { + if (s.inRange(column)) { + rangeMap.get(s).add(kv); + } + } else { + rangeMap.get(s).add(kv); + } + } + } + } + ht.put(p); + } + + util.flush(); + + Scan scan = new Scan(); + scan.setMaxVersions(); + for (StringRange s : rangeMap.keySet()) { + ColumnRangeFilter filter = null; + if (withFilter) { + filter = new ColumnRangeFilter(s.getStart() == null ? null + : Bytes.toBytes(s.getStart()), s.isStartInclusive(), + s.getEnd() == null ? null : Bytes.toBytes(s.getEnd()), + s.isEndInclusive()); + } + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + List results = new ArrayList(); + LOG.info("scan column range: " + s.toString()); + long timeBeforeScan = System.currentTimeMillis(); + + Result result; + while ((result = scanner.next()) != null) { + for (KeyValue kv : result.list()) { + results.add(kv); + assertArrayEquals(Bytes.toBytes(valueString), kv.getValue()); + } + } + long scanTime = System.currentTimeMillis() - timeBeforeScan; + scanner.close(); + LOG.info("scan time = " + scanTime + "ms"); + LOG.info("found " + results.size() + " results"); + LOG.info("Expecting " + rangeMap.get(s).size() + " results"); + + assertEquals(rangeMap.get(s).size(), results.size()); + } + + if (encrypt) { + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPrePutEncrypt", "hadPreFilterRowEncrypt"}, + TEST_TABLE, + new Boolean[] {true, true} + ); + } else { + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPrePutEncrypt", "hadPreFilterRowEncrypt"}, + TEST_TABLE, + new Boolean[] {false, false} + ); + } + util.deleteTable(table); + ht.close(); + } + + @Test + public void testColumnRangeFilterClient() throws Exception { + testScanFilterWithCoProcessor (true, false); + testScanFilterWithCoProcessor (true, true); + testScanFilterWithCoProcessor (false, false); + testScanFilterWithCoProcessor (false, true); + } + @Test public void testRowMutation() throws IOException { byte[] tableName = TEST_TABLE; @@ -308,6 +528,122 @@ table.close(); } + + // the pre/postFilterRow will be triggered. + public static class CFilter1 extends FilterBase { + @Override + public boolean hasFilterRow() { + return true; + } + public static CFilter1 parseFrom(final byte [] pbBytes) { + return new CFilter1(); + } + } + + // the pre/postGetNextKeyHint will be triggered. + public static class CFilter2 extends FilterBase { + @Override + public ReturnCode filterKeyValue(KeyValue ignored) { + return ReturnCode.SEEK_NEXT_USING_HINT; + } + + public static CFilter2 parseFrom(final byte [] pbBytes) { + return new CFilter2(); + } + } + + @Test + // HBase-6805 + public void testHBase6805() throws IOException { + byte[] tableName = Bytes.toBytes("testHBase6805"); + util.createTable(tableName, new byte[][] {A, B, C}); + + encryptTable (tableName, true); + + String[] checkItems = new String[] {"hadPostFilterKeyValue","hadPreFilterKeyValue", + "hadPostFilterRowKey","hadPreFilterRowKey", + "hadPostFilterGetNextKeyHint", "hadFilterGetNextKeyHit", + "hadPostFilterTransform","hadPreFilterTransform", + "hadPostFilterRow","hadPreFilterRow"}; + + + HTable table = new HTable(util.getConfiguration(), tableName); + Put put = new Put(ROW); + put.add(A, A, A); + table.put(put); + + Get get = new Get(ROW); + get.addColumn(A, A); + Result result = table.get(get); + assertArrayEquals (A, result.getColumnLatest(A, A).getValue()); + + // verify that scannerNext and scannerClose upcalls won't be invoked + // when we perform get(). + verifyMethodResult(SimpleRegionObserver.class, + checkItems, + tableName, + new Boolean[] {false, false, false, false, false, false, false, false, false, false} + ); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"encrypted"}, + tableName, + new Boolean[] {true} + ); + + Scan s = new Scan(); + ResultScanner scanner = table.getScanner(s); + try { + for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { + } + } finally { + scanner.close(); + } + + // now scanner hooks should be invoked, since it doesn't have any filter at all. + verifyMethodResult(SimpleRegionObserver.class, + checkItems, + tableName, + new Boolean[] {false, false, false, false, false, false, false, false, false, false} + ); + { + s.setFilter(new CFilter1()); + + scanner = table.getScanner(s); + try { + for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { + } + } finally { + scanner.close(); + } + + // now scanner hooks should be invoked. + verifyMethodResult(SimpleRegionObserver.class, checkItems, tableName, + new Boolean[] { true, true, true, true, false, false, true, true, true, + true }); + } + + { + s.setFilter(new CFilter2()); + + scanner = table.getScanner(s); + try { + for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { + } + } finally { + scanner.close(); + } + + // now scanner hooks should be invoked. + verifyMethodResult(SimpleRegionObserver.class, checkItems, tableName, + new Boolean[] { true, true, true, true, true, true, true, true, true, + true }); + } + + util.deleteTable(tableName); + table.close(); + } + /* Overrides compaction to only output rows with keys that are even numbers */ public static class EvenOnlyCompactor extends BaseRegionObserver { long lastCompaction; @@ -483,7 +819,7 @@ } // check each region whether the coprocessor upcalls are called or not. - private void verifyMethodResult(Class c, String methodName[], byte[] tableName, + public void verifyMethodResult(Class c, String methodName[], byte[] tableName, Object value[]) throws IOException { try { for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1390274) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -783,6 +785,144 @@ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; /** + * Called before {@link org.apache.hadoop.hbase.filter.Filter#filterRow(List)}
+ * + * @param ctx + * @param kvs + * @param filter + * @throws IOException + */ + void preFilterRow(final ObserverContext ctx, + List kvs, Filter filter) throws IOException; + + /** + * Called after {@link org.apache.hadoop.hbase.filter.Filter#filterRow(List)}
+ * @param ctx + * @param kvs + * @param filter + * @throws IOException + */ + void postFilterRow(final ObserverContext ctx, + List kvs, Filter filter) throws IOException; + + /** + * Called before {@link org.apache.hadoop.hbase.filter.Filter#transform(KeyValue)}
+ * @param ctx + * @param raw + * @param result the previous preFilterTransform, modify as necessary + * @param filter + * @return transformed KeyValue + * @throws IOException + */ + KeyValue preFilterTransform( + final ObserverContext ctx, + KeyValue raw, + KeyValue result, final Filter filter) + throws IOException; + + /** + * Called after {@link org.apache.hadoop.hbase.filter.Filter#transform(KeyValue)}
+ * @param ctx + * @param raw the source KeyValue + * @param result the previous postFilterTransform, modify as necessary + * @param filter + * @return transformed KeyValue + * @throws IOException + */ + KeyValue postFilterTransform( + final ObserverContext ctx, + KeyValue raw, KeyValue result, final Filter filter) + throws IOException; + + /** + * Called before {@link org.apache.hadoop.hbase.filter.Filter#getNextKeyHint(KeyValue)}
+ * @param ctx + * @param raw the source KeyValue + * @param result the previous preFilterGetNextKeyHint, modify as necessary + * @param filter + * @return the Next Key Hint + * @throws IOException + */ + KeyValue preFilterGetNextKeyHint( + final ObserverContext ctx, + KeyValue raw, KeyValue result, final Filter filter) + throws IOException; + + /** + * Called after {@link org.apache.hadoop.hbase.filter.Filter#getNextKeyHint(KeyValue)}
+ * @param ctx + * @param raw the source KeyValue + * @param result the previous postFilterGetNextKeyHint, modify as necessary + * @param filter + * @return the Next Key Hint + * @throws IOException + */ + KeyValue postFilterGetNextKeyHint( + final ObserverContext ctx, + KeyValue raw, KeyValue result, final Filter filter) + throws IOException; + + /** + * Called before {@link org.apache.hadoop.hbase.filter.Filter#filterRowKey(byte[], int, int)} + * @param ctx + * @param buffer + * @param offset + * @param length + * @param result the previous preFilterRowKey, modify as necessary + * @param filter + * @return indicate if need to filter the row key + * @throws IOException + */ + boolean preFilterRowKey( + final ObserverContext ctx, + final byte[] buffer, final int offset, final int length, + boolean result, final Filter filter) throws IOException; + + /** + * Called after {@link org.apache.hadoop.hbase.filter.Filter#filterRowKey(byte[], int, int)} + * @param ctx + * @param buffer + * @param offset + * @param length + * @param result the previous postFilterRowKey, modify as necessary + * @param filter + * @return indicate if need to filter the row key + * @throws IOException + */ + boolean postFilterRowKey( + final ObserverContext ctx, + final byte[] buffer, final int offset, final int length, + boolean result, final Filter filter) throws IOException; + + /** + * Called before {@link org.apache.hadoop.hbase.filter.Filter#filterKeyValue(KeyValue)}
+ * @param ctx + * @param kv + * @param result the match code from the previous preFilterKeyValue, modify as necessary + * @param filter + * @return indicate the match code + * @throws IOException + */ + ReturnCode preFilterKeyValue( + final ObserverContext ctx, + KeyValue kv, ReturnCode result, final Filter filter) + throws IOException; + + /** + * Called after {@link org.apache.hadoop.hbase.filter.Filter#filterKeyValue(KeyValue)}
+ * @param ctx + * @param kv + * @param result the match code from the previous postFilterKeyValue, modify as necessary + * @param filter + * @return + * @throws IOException + */ + ReturnCode postFilterKeyValue( + final ObserverContext ctx, + KeyValue kv, ReturnCode result, final Filter filter) + throws IOException; + + /** * Called before bulkLoadHFile. Users can create a StoreFile instance to * access the contents of a HFile. * Index: src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1390274) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -25,12 +25,17 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Arrays; import java.util.NavigableSet; import com.google.common.collect.ImmutableList; +import com.sun.jersey.core.util.Base64; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -41,6 +46,8 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -58,6 +65,8 @@ * It works with TestRegionObserverInterface to provide the test case. */ public class SimpleRegionObserver extends BaseRegionObserver { + public static final String HTD_KEY_NEED_ENCRYT_VALUE = "need_ecrypt_value"; + static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class); boolean beforeDelete = true; @@ -77,8 +86,10 @@ boolean hadPreCompact; boolean hadPostCompact; boolean hadPreGet = false; + boolean hadPreGetEncrypt = false; boolean hadPostGet = false; boolean hadPrePut = false; + boolean hadPrePutEncrypt = false; boolean hadPostPut = false; boolean hadPreDeleted = false; boolean hadPostDeleted = false; @@ -95,9 +106,66 @@ boolean hadPreScannerOpen = false; boolean hadPreStoreScannerOpen = false; boolean hadPostScannerOpen = false; + boolean hadPostFilterKeyValue = false; + boolean hadPreFilterKeyValue = false; + boolean hadPostFilterRowKey = false; + boolean hadPreFilterRowKey = false; + boolean hadPostFilterGetNextKeyHint = false; + boolean hadFilterGetNextKeyHit = false; + boolean hadPostFilterTransform = false; + boolean hadPreFilterTransform = false; + boolean hadPostFilterRow = false; + boolean hadPreFilterRow = false; + boolean hadPreFilterRowEncrypt = false; boolean hadPreBulkLoadHFile = false; - boolean hadPostBulkLoadHFile = false; - + boolean hadPostBulkLoadHFile = false; + + boolean encrypted = false; + + public boolean encrypted() { + return encrypted; + } + + protected static byte[] encrypt (byte[] source) { + if (null == source) { + return null; + } + return Base64.encode(source); + } + + protected static byte[] decrypt (byte[] source) { + if (null == source) { + return null; + } + return Base64.decode(source); + } + + protected static KeyValue decrypt (final KeyValue source) { + // TODO should be more efficiently + return new KeyValue (source.getRow(), source.getFamily(), + source.getQualifier(), source.getTimestamp(), KeyValue.Type.codeToType(source.getType()), + decrypt(source.getValue())); + } + + protected static KeyValue encrypt (final KeyValue source) { + // TODO should be more efficiently + return new KeyValue (source.getRow(), source.getFamily(), + source.getQualifier(), source.getTimestamp(), KeyValue.Type.codeToType(source.getType()), + encrypt(source.getValue())); + } + + protected static void encrypt (final List source) { + for (int i = 0; i < source.size(); ++i) { + source.set(i, encrypt (source.get(i))); + } + } + + protected static void decrypt (final List source) { + for (int i = 0; i < source.size(); ++i) { + source.set(i, decrypt (source.get(i))); + } + } + @Override public void start(CoprocessorEnvironment e) throws IOException { // this only makes sure that leases and locks are available to coprocessors @@ -108,6 +176,14 @@ leases.cancelLease("x"); Integer lid = re.getRegion().getLock(null, Bytes.toBytes("some row"), true); re.getRegion().releaseRowLock(lid); + + // to get the information from the Table Description if the table need / is encrypted. + String needEncrypted = re.getRegion().getTableDesc().getValue(HTD_KEY_NEED_ENCRYT_VALUE); + if (null != needEncrypted && Boolean.parseBoolean(needEncrypted)) { + this.encrypted = true; + } else { + this.encrypted = false; + } } @Override @@ -248,6 +324,13 @@ final InternalScanner s, final List results, final int limit, final boolean hasMore) throws IOException { hadPostScannerNext = true; + + if (encrypted) { + for (Result r : results) { + decrypt(r.list()); + } + } + return hasMore; } @@ -271,12 +354,19 @@ assertNotNull(e.getRegion()); assertNotNull(get); assertNotNull(results); + + if (encrypted) { + } hadPreGet = true; } + @Override public void postGet(final ObserverContext c, final Get get, final List results) { + if (encrypted) { + decrypt (results); + } RegionCoprocessorEnvironment e = c.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); @@ -333,6 +423,13 @@ TestRegionObserverInterface.C)); } hadPrePut = true; + + if (this.encrypted) { + for (Map.Entry> entry : familyMap.entrySet()) { + encrypt (entry.getValue()); + } + hadPrePutEncrypt = true; + } } @Override @@ -433,6 +530,121 @@ } @Override + public void preFilterRow( + final ObserverContext env, List kvs, + final Filter filter) throws IOException { + if (encrypted) { + decrypt (kvs); + filter.filterRow(kvs); + env.bypass(); + hadPreFilterRowEncrypt = true; + } + hadPreFilterRow = true; + } + + @Override + public void postFilterRow(final ObserverContext env, + List kvs, final Filter filter) throws IOException { + if (encrypted) { + encrypt (kvs); + } + hadPostFilterRow = true; + } + + @Override + public KeyValue preFilterTransform( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + if (encrypted) { + result = decrypt(raw); + result = filter.transform(result); + env.bypass(); + } + hadPreFilterTransform = true; + return result; + } + + @Override + public KeyValue postFilterTransform( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + if (encrypted) { + result = encrypt (result); + } + hadPostFilterTransform = true; + return result; + } + + @Override + public KeyValue preFilterGetNextKeyHint( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + if (encrypted) { + result = decrypt(raw); + result = filter.getNextKeyHint(result); + env.bypass(); + } + hadFilterGetNextKeyHit = true; + return result; + } + + @Override + public KeyValue postFilterGetNextKeyHint( + final ObserverContext env, KeyValue raw, + KeyValue result, final Filter filter) throws IOException { + if (encrypted) { + // TODO may need to encrypt the value as well + } + hadPostFilterGetNextKeyHint = true; + return result; + } + + @Override + public boolean preFilterRowKey( + final ObserverContext env, final byte[] buffer, + final int offset, final int length, boolean result, final Filter filter) throws IOException { + if (encrypted) { + result = filter.filterRowKey(buffer, offset, length); + env.bypass(); + } + hadPreFilterRowKey = true; + return result; + } + + @Override + public boolean postFilterRowKey( + final ObserverContext env, final byte[] buffer, + final int offset, final int length, boolean result, final Filter filter) throws IOException { + if (encrypted) { + } + hadPostFilterRowKey = true; + return result; + } + + @Override + public ReturnCode preFilterKeyValue( + final ObserverContext env, KeyValue kv, + ReturnCode result, final Filter filter) throws IOException { + if (encrypted) { + KeyValue newKV = decrypt(kv); + result = filter.filterKeyValue(newKV); + env.bypass(); + } + hadPreFilterKeyValue = true; + return result; + } + + @Override + public ReturnCode postFilterKeyValue( + final ObserverContext env, KeyValue kv, + ReturnCode result, final Filter filter) throws IOException { + if (encrypted) { + } + hadPostFilterKeyValue = true; + return result; + } + + @Override public void preBulkLoadHFile(ObserverContext ctx, List> familyPaths) throws IOException { RegionCoprocessorEnvironment e = ctx.getEnvironment(); @@ -468,11 +680,15 @@ hadPostBulkLoadHFile = true; return hasLoaded; } - + public boolean hadPreGet() { return hadPreGet; } + public boolean hadPreGetEncrypt() { + return hadPreGetEncrypt; + } + public boolean hadPostGet() { return hadPostGet; } @@ -480,6 +696,10 @@ public boolean hadPrePut() { return hadPrePut; } + + public boolean hadPrePutEncrypt() { + return hadPrePutEncrypt; + } public boolean hadPostPut() { return hadPostPut; @@ -516,6 +736,49 @@ return hadPreDeleted && hadPostDeleted; } + public boolean hadPostFilterKeyValue() { + return hadPostFilterKeyValue; + } + + public boolean hadPreFilterKeyValue() { + return hadPreFilterKeyValue; + } + public boolean hadPostFilterRowKey() { + return hadPostFilterRowKey; + } + + public boolean hadPreFilterRowKey() { + return hadPreFilterRowKey; + } + + public boolean hadPostFilterGetNextKeyHint() { + return hadPostFilterGetNextKeyHint; + } + + public boolean hadFilterGetNextKeyHit() { + return hadFilterGetNextKeyHit; + } + + public boolean hadPostFilterTransform() { + return hadPostFilterTransform; + } + + public boolean hadPreFilterTransform() { + return hadPreFilterTransform; + } + + public boolean hadPostFilterRow() { + return hadPostFilterRow; + } + + public boolean hadPreFilterRow() { + return hadPreFilterRow; + } + + public boolean hadPreFilterRowEncrypt() { + return hadPreFilterRowEncrypt; + } + public boolean hadPostBulkLoadHFile() { return hadPostBulkLoadHFile; } Index: src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (revision 1390274) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (working copy) @@ -25,10 +25,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import com.google.protobuf.InvalidProtocolBufferException; @@ -42,16 +46,39 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class FilterWrapper extends Filter { + private static final Log LOG = LogFactory.getLog(FilterWrapper.class); Filter filter = null; + RegionCoprocessorHost coprocessorHost = null; + + public Filter getFilterInstance() { + return this.filter; + } public FilterWrapper( Filter filter ) { if (null == filter) { // ensure the filter instance is not null throw new NullPointerException("Cannot create FilterWrapper with null Filter"); } - this.filter = filter; + + if (filter instanceof FilterWrapper) { + // copy constructor + this.filter = ( (FilterWrapper) filter ).getFilterInstance(); + this.coprocessorHost = ( (FilterWrapper) filter ).getCoprocessorHost(); + } else { + this.filter = filter; + } } + public void setCoprocessorHost(RegionCoprocessorHost coprocessorHost) { + if (coprocessorHost != null) { + this.coprocessorHost = coprocessorHost; + } + } + + public RegionCoprocessorHost getCoprocessorHost() { + return this.coprocessorHost; + } + /** * @return The filter serialized using pb */ @@ -99,40 +126,143 @@ } @Override - public KeyValue getNextKeyHint(KeyValue currentKV) { - return this.filter.getNextKeyHint(currentKV); + public boolean hasFilterRow() { + // TODO add co-processor supporting if necessary + return this.filter.hasFilterRow(); + } + + @Override + public KeyValue getNextKeyHint(KeyValue raw) { + boolean bypass = false; + // default should be no hint + KeyValue result = null; + if (this.coprocessorHost != null) { + try { + KeyValue[] resultKV = {result}; + bypass = this.coprocessorHost.preFilterGetNextKeyHint(raw, resultKV, + this.filter); + result = resultKV[0]; + } catch (IOException e) { + // TODO to throws IOException + throw new RuntimeException(e); + } + } + + if (!bypass) { + result = this.filter.getNextKeyHint(raw); + } + + try { + return this.coprocessorHost != null ? this.coprocessorHost + .postFilterGetNextKeyHint(raw, result, this.filter) : result; + } catch (IOException e) { + // TODO to throws IOException + throw new RuntimeException(e); + } } @Override public boolean filterRowKey(byte[] buffer, int offset, int length) { - return this.filter.filterRowKey(buffer, offset, length); + Boolean result = null; + if (this.coprocessorHost != null) { + try { + // default, should not filter any row key + result = this.coprocessorHost.preFilterRowKey(buffer, offset, length, false, + this.filter); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + if (null == result) { + result = this.filter.filterRowKey(buffer, offset, length); + } + + try { + return this.coprocessorHost != null ? this.coprocessorHost + .postFilterRowKey(buffer, offset, length, result, this.filter) + : result; + } catch (IOException e) { + // TODO to throws IOException + throw new RuntimeException(e); + } } @Override public ReturnCode filterKeyValue(KeyValue v) { - return this.filter.filterKeyValue(v); - } + ReturnCode rc = null; + if (this.coprocessorHost != null) { + try { + rc = this.coprocessorHost.preFilterKeyValue(v, rc, this.filter); + } catch (IOException e) { + throw new RuntimeException(e); + } + } - @Override - public KeyValue transform(KeyValue v) { - return this.filter.transform(v); + if (rc == null) { + rc = this.filter.filterKeyValue(v); + } + try { + return this.coprocessorHost != null ? this.coprocessorHost + .postFilterKeyValue(v, rc, this.filter) : rc; + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override - public boolean hasFilterRow() { - return this.filter.hasFilterRow(); + public KeyValue transform(KeyValue raw) { + boolean bypass = false; + KeyValue result = raw; + if (this.coprocessorHost != null) { + try { + KeyValue[] resultKV = {result}; + bypass = this.coprocessorHost.preFilterTransform(raw, resultKV, this.filter); + result = resultKV[0]; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (!bypass) { + result = this.filter.transform(raw); + } + try { + return this.coprocessorHost != null ? this.coprocessorHost + .postFilterTransform(raw, result, this.filter) : result; + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public void filterRow(List kvs) { - //To fix HBASE-6429, - //Filter with filterRow() returning true is incompatible with scan with limit - //1. hasFilterRow() returns true, if either filterRow() or filterRow(kvs) is implemented. - //2. filterRow() is merged with filterRow(kvs), - //so that to make all those row related filtering stuff in the same function. - this.filter.filterRow(kvs); - if (!kvs.isEmpty() && this.filter.filterRow()) { - kvs.clear(); + boolean bypass = false; + if (this.coprocessorHost != null) { + try { + bypass = this.coprocessorHost.preFilterRow(kvs, this.filter); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + if (!bypass) { + //To fix HBASE-6429, + //Filter with filterRow() returning true is incompatible with scan with limit + //1. hasFilterRow() returns true, if either filterRow() or filterRow(kvs) is implemented. + //2. filterRow() is merged with filterRow(kvs), + //so that to make all those row related filtering stuff in the same function. + this.filter.filterRow(kvs); + if (!kvs.isEmpty() && this.filter.filterRow()) { + kvs.clear(); + } + } + + if (this.coprocessorHost != null) { + try { + this.coprocessorHost.postFilterRow(kvs, this.filter); + } catch (IOException e) { + throw new RuntimeException(e); + } } }