Index: hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -455,6 +455,34 @@ } /** + * Call flushCache on all regions on all participating regionservers. + * @throws IOException + */ + public void compact(boolean major) throws IOException { + for (JVMClusterUtil.RegionServerThread t: + this.hbaseCluster.getRegionServers()) { + for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { + r.compactStores(major); + } + } + } + + /** + * Call flushCache on all regions of the specified table. + * @throws IOException + */ + public void compact(byte [] tableName, boolean major) throws IOException { + for (JVMClusterUtil.RegionServerThread t: + this.hbaseCluster.getRegionServers()) { + for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { + if(Bytes.equals(r.getTableDesc().getName(), tableName)) { + r.compactStores(major); + } + } + } + } + + /** * @return List of region server threads. */ public List getRegionServerThreads() { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (working copy) @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; +// this deliberate not in the o.a.h.h.regionserver package +// in order to make sure all required classes/method are available + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +@Category(MediumTests.class) +public class TestCoprocessorScanPolicy { + final Log LOG = LogFactory.getLog(getClass()); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] F = Bytes.toBytes("fam"); + private static final byte[] Q = Bytes.toBytes("qual"); + private static final byte[] R = Bytes.toBytes("row"); + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ScanObserver.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBaseCases() throws Exception { + byte[] tableName = Bytes.toBytes("baseCases"); + HTable t = TEST_UTIL.createTable(tableName, F, 1); + // set the version override to 3 + Put p = new Put(R); + p.setAttribute("versions", new byte[]{}); + p.add(F, tableName, Bytes.toBytes(2)); + t.put(p); + + // insert 2 versions + p = new Put(R); + p.add(F, Q, Q); + t.put(p); + p = new Put(R); + p.add(F, Q, Q); + t.put(p); + Get g = new Get(R); + g.setMaxVersions(10); + Result r = t.get(g); + assertEquals(2, r.size()); + + TEST_UTIL.flush(tableName); + TEST_UTIL.compact(tableName, true); + + g = new Get(R); + g.setMaxVersions(10); + r = t.get(g); + assertEquals(2, r.size()); + + // insert a 3rd version + p = new Put(R); + p.add(F, Q, Q); + t.put(p); + g = new Get(R); + g.setMaxVersions(10); + r = t.get(g); + // still only two version visible + assertEquals(2, r.size()); + + t.close(); + } + + @Test + public void testTTL() throws Exception { + byte[] tableName = Bytes.toBytes("testTTL"); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(F) + .setMaxVersions(10) + .setTimeToLive(1); + desc.addFamily(hcd); + TEST_UTIL.getHBaseAdmin().createTable(desc); + HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName); + long now = EnvironmentEdgeManager.currentTimeMillis(); + ManualEnvironmentEdge me = new ManualEnvironmentEdge(); + me.setValue(now); + EnvironmentEdgeManagerTestHelper.injectEdge(me); + // 2s in the past + long ts = now - 2000; + // Set the TTL override to 3s + Put p = new Put(R); + p.setAttribute("ttl", new byte[]{}); + p.add(F, tableName, Bytes.toBytes(3000L)); + t.put(p); + + p = new Put(R); + p.add(F, Q, ts, Q); + t.put(p); + p = new Put(R); + p.add(F, Q, ts+1, Q); + t.put(p); + + // these two should be expired but for the override + Get g = new Get(R); + g.setMaxVersions(10); + Result r = t.get(g); + // still there? + assertEquals(2, r.size()); + // flush + compact can take up to a second here(?) + // so the 6s timeout above leaves a 3s fudge factor + TEST_UTIL.flush(tableName); + TEST_UTIL.compact(tableName, true); + + g = new Get(R); + g.setMaxVersions(10); + r = t.get(g); + // still there? + assertEquals(2, r.size()); + + // roll time forward 2s. + me.setValue(now + 2000); + // now verify that data eventually does expire + g = new Get(R); + g.setMaxVersions(10); + r = t.get(g); + // should be gone now + assertEquals(0, r.size()); + t.close(); + } + + public static class ScanObserver extends BaseRegionObserver { + private Map ttls = new HashMap(); + private Map versions = new HashMap(); + + // lame way to communicate with the coprocessor, + // since it is loaded by a different class loader + @Override + public void prePut(final ObserverContext c, final Put put, + final WALEdit edit, final boolean writeToWAL) throws IOException { + if (put.getAttribute("ttl") != null) { + KeyValue kv = put.getFamilyMap().values().iterator().next().get(0); + ttls.put(Bytes.toString(kv.getQualifier()), Bytes.toLong(kv.getValue())); + c.bypass(); + } else if (put.getAttribute("versions") != null) { + KeyValue kv = put.getFamilyMap().values().iterator().next().get(0); + versions.put(Bytes.toString(kv.getQualifier()), Bytes.toInt(kv.getValue())); + c.bypass(); + } + } + + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Long newTtl = ttls.get(store.getTableName()); + if (newTtl != null) { + System.out.println("PreFlush:" + newTtl); + } + Integer newVersions = versions.get(store.getTableName()); + Store.ScanInfo oldSI = store.getScanInfo(); + HColumnDescriptor family = store.getFamily(); + Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(), + newVersions == null ? family.getMaxVersions() : newVersions, + newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + Scan scan = new Scan(); + scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); + return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + + @Override + public InternalScanner preCompactScannerOpen(final ObserverContext c, + Store store, List scanners, ScanType scanType, + long earliestPutTs, InternalScanner s) throws IOException { + Long newTtl = ttls.get(store.getTableName()); + Integer newVersions = versions.get(store.getTableName()); + Store.ScanInfo oldSI = store.getScanInfo(); + HColumnDescriptor family = store.getFamily(); + Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(), + newVersions == null ? family.getMaxVersions() : newVersions, + newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + Scan scan = new Scan(); + scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); + return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() + .getSmallestReadPoint(), earliestPutTs); + } + + @Override + public KeyValueScanner preStoreScannerOpen( + final ObserverContext c, Store store, final Scan scan, + final NavigableSet targetCols, KeyValueScanner s) throws IOException { + Long newTtl = ttls.get(store.getTableName()); + Integer newVersions = versions.get(store.getTableName()); + Store.ScanInfo oldSI = store.getScanInfo(); + HColumnDescriptor family = store.getFamily(); + Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(), + newVersions == null ? family.getMaxVersions() : newVersions, + newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + return new StoreScanner(store, scanInfo, scan, targetCols); + } + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (working copy) @@ -0,0 +1,54 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; + +public class NoOpScanPolicyObserver extends BaseRegionObserver { + /** + * Reimplement the default behavior + */ + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store.ScanInfo oldSI = store.getScanInfo(); + Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + Scan scan = new Scan(); + scan.setMaxVersions(oldSI.getMaxVersions()); + return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + + /** + * Reimplement the default behavior + */ + @Override + public InternalScanner preCompactScannerOpen(final ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs, + InternalScanner s) throws IOException { + // this demonstrates how to override the scanners default behavior + Store.ScanInfo oldSI = store.getScanInfo(); + Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + Scan scan = new Scan(); + scan.setMaxVersions(oldSI.getMaxVersions()); + return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() + .getSmallestReadPoint(), earliestPutTs); + } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + Store store, final Scan scan, final NavigableSet targetCols, KeyValueScanner s) + throws IOException { + return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (working copy) @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -559,7 +558,7 @@ KeyValue.COMPARATOR); StoreScanner scanner = new StoreScanner(scan, scanInfo, - StoreScanner.ScanType.MAJOR_COMPACT, null, scanners, + ScanType.MAJOR_COMPACT, null, scanners, HConstants.OLDEST_TIMESTAMP); List results = new ArrayList(); results = new ArrayList(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java (working copy) @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.junit.experimental.categories.Category; + +/** + * Make sure all compaction tests still pass with the preFlush and preCompact + * overridden to implement the default bevavior + */ +@Category(MediumTests.class) +public class TestCompactionWithCoprocessor extends TestCompaction { + /** constructor */ + public TestCompactionWithCoprocessor() throws Exception { + super(); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + NoOpScanPolicyObserver.class.getName()); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (working copy) @@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.MD5Hash; @@ -408,7 +407,7 @@ Scan scan = new Scan(); // Include deletes - scanner = new StoreScanner(store, scan, scanners, + scanner = new StoreScanner(store, store.scanInfo, scan, scanners, ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE); ArrayList kvs = new ArrayList(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Arrays; +import java.util.NavigableSet; import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; @@ -42,7 +43,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -63,11 +66,13 @@ boolean hadPreClose; boolean hadPostClose; boolean hadPreFlush; + boolean hadPreFlushScannerOpen; boolean hadPostFlush; boolean hadPreSplit; boolean hadPostSplit; boolean hadPreCompactSelect; boolean hadPostCompactSelect; + boolean hadPreCompactScanner; boolean hadPreCompact; boolean hadPostCompact; boolean hadPreGet = false; @@ -87,6 +92,7 @@ boolean hadPreScannerClose = false; boolean hadPostScannerClose = false; boolean hadPreScannerOpen = false; + boolean hadPreStoreScannerOpen = false; boolean hadPostScannerOpen = false; boolean hadPreBulkLoadHFile = false; boolean hadPostBulkLoadHFile = false; @@ -120,12 +126,20 @@ } @Override - public void preFlush(ObserverContext c) { + public InternalScanner preFlush(ObserverContext c, Store store, InternalScanner scanner) { hadPreFlush = true; + return scanner; } @Override - public void postFlush(ObserverContext c) { + public InternalScanner preFlushScannerOpen(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + hadPreFlushScannerOpen = true; + return null; + } + + @Override + public void postFlush(ObserverContext c, Store store, StoreFile resultFile) { hadPostFlush = true; } @@ -167,6 +181,14 @@ } @Override + public InternalScanner preCompactScannerOpen(final ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs, + InternalScanner s) throws IOException { + hadPreCompactScanner = true; + return null; + } + + @Override public void postCompact(ObserverContext e, Store store, StoreFile resultFile) { hadPostCompact = true; @@ -185,6 +207,14 @@ } @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + final Store store, final Scan scan, final NavigableSet targetCols, + final KeyValueScanner s) throws IOException { + hadPreStoreScannerOpen = true; + return null; + } + + @Override public RegionScanner postScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) throws IOException { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -90,12 +90,12 @@ @Category(LargeTests.class) public class TestFromClientSide { final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); - private static int SLAVES = 3; + protected static int SLAVES = 3; /** * @throws java.lang.Exception Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (working copy) @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Test all client operations with a coprocessor that + * just implements the default flush/compact/scan policy + */ +@Category(LargeTests.class) +public class TestFromClientSideWithCoprocessor extends TestFromClientSide { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName()); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1366772) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -788,8 +788,24 @@ this.hbaseCluster.flushcache(tableName); } + /** + * Compact all regions in the mini hbase cluster + * @throws IOException + */ + public void compact(boolean major) throws IOException { + this.hbaseCluster.compact(major); + } /** + * Compact all of a table's reagion in the mini hbase cluster + * @throws IOException + */ + public void compact(byte [] tableName, boolean major) throws IOException { + this.hbaseCluster.compact(tableName, major); + } + + + /** * Create a table. * @param tableName * @param family Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1216,7 +1216,7 @@ * @param majorCompaction True to force a major compaction regardless of thresholds * @throws IOException e */ - void compactStores(final boolean majorCompaction) + public void compactStores(final boolean majorCompaction) throws IOException { if (majorCompaction) { this.triggerMajorCompaction(); @@ -3469,7 +3469,7 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - StoreScanner scanner = store.getScanner(scan, entry.getValue()); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -304,6 +304,31 @@ } /** + * See + * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner)} + */ + public InternalScanner preCompactScannerOpen(Store store, List scanners, + ScanType scanType, long earliestPutTs) throws IOException { + ObserverContext ctx = null; + InternalScanner s = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners, + scanType, earliestPutTs, s); + } catch (Throwable e) { + handleCoprocessorThrowable(env,e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return s; + } + + /** * Called prior to selecting the {@link StoreFile}s for compaction from * the list of currently available candidates. * @param store The store where compaction is being requested @@ -412,6 +437,31 @@ * Invoked before a memstore flush * @throws IOException */ + public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException { + ObserverContext ctx = null; + boolean bypass = false; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + scanner = ((RegionObserver)env.getInstance()).preFlush( + ctx, store, scanner); + } catch (Throwable e) { + handleCoprocessorThrowable(env,e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? null : scanner; + } + + /** + * Invoked before a memstore flush + * @throws IOException + */ public void preFlush() throws IOException { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { @@ -430,6 +480,29 @@ } /** + * See + * {@link RegionObserver#preFlush(ObserverContext, Store, KeyValueScanner)} + */ + public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner) throws IOException { + ObserverContext ctx = null; + InternalScanner s = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, memstoreScanner, s); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return s; + } + + /** * Invoked after a memstore flush * @throws IOException */ @@ -451,6 +524,27 @@ } /** + * Invoked after a memstore flush + * @throws IOException + */ + public void postFlush(final Store store, final StoreFile storeFile) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** * Invoked just before a split * @throws IOException */ @@ -1089,6 +1183,31 @@ } /** + * See + * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner)} + */ + public KeyValueScanner preStoreScannerOpen(Store store, Scan scan, + final NavigableSet targetCols) throws IOException { + KeyValueScanner s = null; + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan, + targetCols, s); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return s; + } + + /** * @param scan the Scan specification * @param s the scanner * @return the scanner instance to use Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; @@ -43,7 +44,7 @@ * into List for a single row. */ @InterfaceAudience.Private -class StoreScanner extends NonLazyKeyValueScanner +public class StoreScanner extends NonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(StoreScanner.class); private Store store; @@ -106,16 +107,16 @@ * @param columns which columns we are scanning * @throws IOException */ - StoreScanner(Store store, Scan scan, final NavigableSet columns) + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns) throws IOException { - this(store, scan.getCacheBlocks(), scan, columns, store.scanInfo.getTtl(), - store.scanInfo.getMinVersions()); + this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), + scanInfo.getMinVersions()); initializeMetricNames(); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException( "Cannot specify any column for a raw scan"); } - matcher = new ScanQueryMatcher(scan, store.scanInfo, columns, + matcher = new ScanQueryMatcher(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); @@ -158,13 +159,13 @@ * @param smallestReadPoint the readPoint that we should use for tracking * versions */ - StoreScanner(Store store, Scan scan, + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - this(store, false, scan, null, store.scanInfo.getTtl(), - store.scanInfo.getMinVersions()); + this(store, false, scan, null, scanInfo.getTtl(), + scanInfo.getMinVersions()); initializeMetricNames(); - matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType, + matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, earliestPutTs, oldestUnexpiredTS); // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -181,7 +182,7 @@ /** Constructor for testing. */ StoreScanner(final Scan scan, Store.ScanInfo scanInfo, - StoreScanner.ScanType scanType, final NavigableSet columns, + ScanType scanType, final NavigableSet columns, final List scanners) throws IOException { this(scan, scanInfo, scanType, columns, scanners, HConstants.LATEST_TIMESTAMP); @@ -189,7 +190,7 @@ // Constructor for testing. StoreScanner(final Scan scan, Store.ScanInfo scanInfo, - StoreScanner.ScanType scanType, final NavigableSet columns, + ScanType scanType, final NavigableSet columns, final List scanners, long earliestPutTs) throws IOException { this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), @@ -598,14 +599,5 @@ static void enableLazySeekGlobally(boolean enable) { lazySeekEnabledGlobally = enable; } - - /** - * Enum to distinguish general scan types. - */ - public static enum ScanType { - MAJOR_COMPACT, - MINOR_COMPACT, - USER_SCAN - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -212,9 +211,7 @@ "ms in store " + this); // Why not just pass a HColumnDescriptor in here altogether? Even if have // to clone it? - scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), - family.getMaxVersions(), ttl, family.getKeepDeletedCells(), - timeToPurgeDeletes, this.comparator); + scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); // By default, compact if storefile.count >= minFilesToCompact @@ -728,15 +725,30 @@ if (set.size() == 0) { return null; } - Scan scan = new Scan(); - scan.setMaxVersions(scanInfo.getMaxVersions()); // Use a store scanner to find which rows to flush. // Note that we need to retain deletes, hence // treat this as a minor compaction. - InternalScanner scanner = new StoreScanner(this, scan, Collections - .singletonList(new CollectionBackedScanner(set, this.comparator)), - ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + InternalScanner scanner = null; + KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator); + if (getHRegion().getCoprocessorHost() != null) { + scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner); + } + if (scanner == null) { + Scan scan = new Scan(); + scan.setMaxVersions(scanInfo.getMaxVersions()); + scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( + set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + if (getHRegion().getCoprocessorHost() != null) { + InternalScanner cpScanner = + getHRegion().getCoprocessorHost().preFlush(this, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; + } try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem @@ -1941,11 +1953,18 @@ * are not in a compaction. * @throws IOException */ - public StoreScanner getScanner(Scan scan, + public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { - return new StoreScanner(this, scan, targetCols); + KeyValueScanner scanner = null; + if (getHRegion().getCoprocessorHost() != null) { + scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); + } + if (scanner == null) { + scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + } + return scanner; } finally { lock.readLock().unlock(); } @@ -2065,7 +2084,7 @@ return compactionSize > throttlePoint; } - HRegion getHRegion() { + public HRegion getHRegion() { return this.region; } @@ -2168,6 +2187,12 @@ } storeFile = Store.this.commitFile(storeFilePath, cacheFlushId, snapshotTimeRangeTracker, flushedSize, status); + if (Store.this.getHRegion().getCoprocessorHost() != null) { + Store.this.getHRegion() + .getCoprocessorHost() + .postFlush(Store.this, storeFile); + } + // Add new file to store files. Clear snapshot too while we have // the Store write lock. return Store.this.updateStorefiles(storeFile, snapshot); @@ -2210,6 +2235,10 @@ return comparator; } + public ScanInfo getScanInfo() { + return scanInfo; + } + /** * Immutable information for scans over a store. */ @@ -2227,6 +2256,17 @@ + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN); /** + * @param family {@link HColumnDescriptor} describing the column family + * @param ttl Store's TTL (in ms) + * @param timeToPurgeDeletes duration in ms after which a delete marker can + * be purged during a major compaction. + * @param comparator The store's comparator + */ + public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) { + this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family + .getKeepDeletedCells(), timeToPurgeDeletes, comparator); + } + /** * @param family Name of this store's column family * @param minVersions Store's MIN_VERSIONS setting * @param maxVersions Store's VERSIONS setting Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; - /** * A query matcher that is specifically designed for the scan case. */ @@ -138,7 +136,7 @@ * based on TTL */ public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, - NavigableSet columns, StoreScanner.ScanType scanType, + NavigableSet columns, ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator().getRawComparator(); @@ -185,7 +183,7 @@ */ ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, NavigableSet columns, long oldestUnexpiredTS) { - this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN, + this(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, /* max Readpoint to track versions */ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (working copy) @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -127,13 +126,22 @@ try { InternalScanner scanner = null; try { - Scan scan = new Scan(); - scan.setMaxVersions(store.getFamily().getMaxVersions()); - /* Include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(store, scan, scanners, - majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, - smallestReadPoint, earliestPutTs); if (store.getHRegion().getCoprocessorHost() != null) { + scanner = store + .getHRegion() + .getCoprocessorHost() + .preCompactScannerOpen(store, scanners, + majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs); + } + if (scanner == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + /* Include deletes, unless we are doing a major compaction */ + scanner = new StoreScanner(store, store.scanInfo, scan, scanners, + majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + smallestReadPoint, earliestPutTs); + } + if (store.getHRegion().getCoprocessorHost() != null) { InternalScanner cpScanner = store.getHRegion().getCoprocessorHost().preCompact(store, scanner); // NULL scanner returned from coprocessor hooks means skip normal processing Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java (working copy) @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Enum to distinguish general scan types. + */ +@InterfaceAudience.Private +public enum ScanType { + MAJOR_COMPACT, + MINOR_COMPACT, + USER_SCAN +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -17,7 +17,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.util.List; -import java.util.Map; +import java.util.NavigableSet; import com.google.common.collect.ImmutableList; @@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -75,6 +77,13 @@ boolean abortRequested) { } @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) + throws IOException { + return null; + } + + @Override public void preFlush(ObserverContext e) throws IOException { } @@ -83,6 +92,17 @@ } @Override + public InternalScanner preFlush(ObserverContext e, Store store, + InternalScanner scanner) throws IOException { + return scanner; + } + + @Override + public void postFlush(ObserverContext e, Store store, + StoreFile resultFile) throws IOException { + } + + @Override public void preSplit(ObserverContext e) throws IOException { } @@ -106,6 +126,13 @@ } @Override + public InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, List scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s) throws IOException { + return null; + } + + @Override public void postCompact(ObserverContext e, final Store store, final StoreFile resultFile) throws IOException { } @@ -240,6 +267,13 @@ final Scan scan, final RegionScanner s) throws IOException { return s; } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + final Store store, final Scan scan, final NavigableSet targetCols, + final KeyValueScanner s) throws IOException { + return null; + } @Override public RegionScanner postScannerOpen(final ObserverContext e, Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1366772) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.List; +import java.util.NavigableSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,9 +36,12 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -65,20 +69,61 @@ void postOpen(final ObserverContext c); /** + * Called before a memstore is flushed to disk and prior to creating the scanner to read the + * memstore. To override or modify the compaction process, + * implementing classes can return a new scanner to provide the KeyValues to be + * stored into the new {@code StoreFile} or null to perform the default processing. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param memstoreScanner the scanner for the memstore that is flushed + * @param s if not null the base scanner from previous RegionObserver in the chain + * @return the scanner to use during compaction. {@code null} if the default implementation + * is to be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) + throws IOException; + + /** * Called before the memstore is flushed to disk. * @param c the environment provided by the region server * @throws IOException if an error occurred on the coprocessor + * @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead */ void preFlush(final ObserverContext c) throws IOException; /** + * Called before a Store's memstore is flushed to disk. + * @param c the environment provided by the region server + * @param store the store where compaction is being requested + * @param scanner the scanner over existing data used in the store file + * @return the scanner to use during compaction. Should not be {@code null} + * unless the implementation is writing new store files on its own. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preFlush(final ObserverContext c, final Store store, + final InternalScanner scanner) throws IOException; + + /** * Called after the memstore is flushed to disk. * @param c the environment provided by the region server * @throws IOException if an error occurred on the coprocessor + * @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead. */ void postFlush(final ObserverContext c) throws IOException; /** + * Called after a Store's memstore is flushed to disk. + * @param c the environment provided by the region server + * @param store the store where compaction is being requested + * @param resultFile the new store file written out during compaction + * @throws IOException if an error occurred on the coprocessor + */ + void postFlush(final ObserverContext c, final Store store, + final StoreFile resultFile) throws IOException; + + /** * Called prior to selecting the {@link StoreFile}s to compact from the list * of available candidates. To alter the files used for compaction, you may * mutate the passed in list of candidates. @@ -128,6 +173,27 @@ final Store store, final InternalScanner scanner) throws IOException; /** + * Called prior to writing the {@link StoreFile}s selected for compaction into + * a new {@code StoreFile} and prior to creating the scanner to used to read the + * input files. To override or modify the compaction process, + * implementing classes can return a new scanner to provide the KeyValues to be + * stored into the new {@code StoreFile} or null to perform the default processing. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanners the list {@link StoreFileScanner}s to be read from + * @param scantype the {@link ScanType} indicating whether this is a major or minor compaction + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved + * store files + * @param s if not null the base scanner from previous RegionObserver in the chain + * @return the scanner to use during compaction. {@code null} if the default implementation + * is to be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, List scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s) throws IOException; + + /** * Called after compaction has completed and the new store file has been * moved in to place. * @param c the environment provided by the region server @@ -550,6 +616,23 @@ throws IOException; /** + * Called before a store opens a new scanner. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param store the store being scanned + * @param scan the Scan specification + * @param targetCols columns to be used in the scanner + * @param s if not null the base scanner from previous RegionObserver in the chain + * @return an KeyValueScanner instance to use or null to use the default implementation + * @throws IOException if an error occurred on the coprocessor + */ + KeyValueScanner preStoreScannerOpen(final ObserverContext c, + final Store store, final Scan scan, final NavigableSet targetCols, + final KeyValueScanner s) throws IOException; + + /** * Called after the client opens a new scanner. *

* Call CoprocessorEnvironment#complete to skip any subsequent chained