Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1366128) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Arrays; @@ -33,6 +34,7 @@ import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -42,9 +44,13 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -63,11 +69,13 @@ boolean hadPreClose; boolean hadPostClose; boolean hadPreFlush; + boolean hadPreFlushScanner; boolean hadPostFlush; boolean hadPreSplit; boolean hadPostSplit; boolean hadPreCompactSelect; boolean hadPostCompactSelect; + boolean hadPreCompactScanner; boolean hadPreCompact; boolean hadPostCompact; boolean hadPreGet = false; @@ -125,6 +133,13 @@ } @Override + public InternalScanner preFlush(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner) throws IOException { + hadPreFlushScanner = true; + return null; + } + + @Override public void postFlush(ObserverContext c) { hadPostFlush = true; } @@ -167,6 +182,14 @@ } @Override + public InternalScanner preCompact(final ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs) + throws IOException { + hadPreCompactScanner = true; + return null; + } + + @Override public void postCompact(ObserverContext e, Store store, StoreFile resultFile) { hadPostCompact = true; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1366128) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -90,12 +90,12 @@ @Category(LargeTests.class) public class TestFromClientSide { final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); - private static int SLAVES = 3; + protected static int SLAVES = 3; /** * @throws java.lang.Exception Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (working copy) @@ -0,0 +1,25 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.ScanPolicyCoprocessor; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Test all client operations with a coprocessor that + * just implements the default flush/compact/scan policy + */ +@Category(LargeTests.class) +public class TestFromClientSideWithCoprocessor extends TestFromClientSide { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName(), ScanPolicyCoprocessor.class.getName()); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (revision 1366128) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (working copy) @@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.MD5Hash; @@ -408,7 +407,7 @@ Scan scan = new Scan(); // Include deletes - scanner = new StoreScanner(store, scan, scanners, + scanner = new StoreScanner(store, store.scanInfo, scan, scanners, ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE); ArrayList kvs = new ArrayList(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java (working copy) @@ -0,0 +1,19 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.junit.experimental.categories.Category; + +/** + * Make sure all compaction tests still pass with the preFlush and preCompact + * overridden to implement the default bevavior + */ +@Category(MediumTests.class) +public class TestCompactionWithCoprocessor extends TestCompaction { + /** constructor */ + public TestCompactionWithCoprocessor() throws Exception { + super(); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + ScanPolicyCoprocessor.class.getName()); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (revision 1366128) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (working copy) @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -559,7 +558,7 @@ KeyValue.COMPARATOR); StoreScanner scanner = new StoreScanner(scan, scanInfo, - StoreScanner.ScanType.MAJOR_COMPACT, null, scanners, + ScanType.MAJOR_COMPACT, null, scanners, HConstants.OLDEST_TIMESTAMP); List results = new ArrayList(); results = new ArrayList(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/ScanPolicyCoprocessor.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/ScanPolicyCoprocessor.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/ScanPolicyCoprocessor.java (working copy) @@ -0,0 +1,53 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; + +public class ScanPolicyCoprocessor extends BaseRegionObserver { + /** + * Reimplement the default behavior + */ + @Override + public InternalScanner preFlush(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner) throws IOException { + Store.ScanInfo oldSI = store.getScanInfo(); + Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + Scan scan = new Scan(); + scan.setMaxVersions(oldSI.getMaxVersions()); + return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + + /** + * Reimplement the default behavior + */ + @Override + public InternalScanner preCompact(final ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs) + throws IOException { + // this demonstrates how to override the scanners default behavior + Store.ScanInfo oldSI = store.getScanInfo(); + Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(), + oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + Scan scan = new Scan(); + scan.setMaxVersions(oldSI.getMaxVersions()); + return new StoreScanner(store, scanInfo, scan, scanners, scanType, store + .getHRegion().getSmallestReadPoint(), earliestPutTs); + } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + Store store, final Scan scan, final NavigableSet targetCols) throws IOException { + return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1366128) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3469,7 +3469,7 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - StoreScanner scanner = store.getScanner(scan, entry.getValue()); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -304,6 +304,30 @@ } /** + * See {@link RegionObserver#preCompact(ObserverContext, Store, InternalScanner)} + */ + public InternalScanner preCompact(Store store, List scanners, + ScanType scanType, long earliestPutTs) throws IOException { + ObserverContext ctx = null; + InternalScanner scanner = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanners, + scanType, earliestPutTs); + } catch (Throwable e) { + handleCoprocessorThrowable(env,e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return scanner; + } + + /** * Called prior to selecting the {@link StoreFile}s for compaction from * the list of currently available candidates. * @param store The store where compaction is being requested @@ -430,6 +454,28 @@ } /** + * See {@link RegionObserver#preFlush(ObserverContext, Store, KeyValueScanner)} + */ + public InternalScanner preFlush(Store store, KeyValueScanner memstoreScanner) throws IOException { + ObserverContext ctx = null; + InternalScanner scanner = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + scanner = ((RegionObserver) env.getInstance()).preFlush(ctx, store, memstoreScanner); + } catch (Throwable e) { + handleCoprocessorThrowable(env,e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return scanner; + } + + /** * Invoked after a memstore flush * @throws IOException */ @@ -1089,6 +1135,29 @@ } /** + * See {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet)} + */ + public KeyValueScanner preStoreScannerOpen(Store store, Scan scan, + final NavigableSet targetCols) throws IOException { + KeyValueScanner s = null; + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + s = ((RegionObserver)env.getInstance()).preStoreScannerOpen(ctx, store, scan, targetCols); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return s; + } + + /** * @param scan the Scan specification * @param s the scanner * @return the scanner instance to use Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; - /** * A query matcher that is specifically designed for the scan case. */ @@ -138,7 +136,7 @@ * based on TTL */ public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, - NavigableSet columns, StoreScanner.ScanType scanType, + NavigableSet columns, ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator().getRawComparator(); @@ -185,7 +183,7 @@ */ ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, NavigableSet columns, long oldestUnexpiredTS) { - this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN, + this(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, /* max Readpoint to track versions */ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (working copy) @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -127,20 +126,29 @@ try { InternalScanner scanner = null; try { - Scan scan = new Scan(); - scan.setMaxVersions(store.getFamily().getMaxVersions()); - /* Include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(store, scan, scanners, - majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, - smallestReadPoint, earliestPutTs); if (store.getHRegion().getCoprocessorHost() != null) { - InternalScanner cpScanner = - store.getHRegion().getCoprocessorHost().preCompact(store, scanner); - // NULL scanner returned from coprocessor hooks means skip normal processing - if (cpScanner == null) { - return null; + scanner = store + .getHRegion() + .getCoprocessorHost() + .preCompact(store, scanners, + majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs); + } + if (scanner == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + /* Include deletes, unless we are doing a major compaction */ + scanner = new StoreScanner(store, store.scanInfo, scan, scanners, + majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + smallestReadPoint, earliestPutTs); + if (store.getHRegion().getCoprocessorHost() != null) { + InternalScanner cpScanner = + store.getHRegion().getCoprocessorHost().preCompact(store, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; } - scanner = cpScanner; } int bytesWritten = 0; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.Bytes; @@ -43,7 +44,7 @@ * into List for a single row. */ @InterfaceAudience.Private -class StoreScanner extends NonLazyKeyValueScanner +public class StoreScanner extends NonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(StoreScanner.class); private Store store; @@ -106,16 +107,16 @@ * @param columns which columns we are scanning * @throws IOException */ - StoreScanner(Store store, Scan scan, final NavigableSet columns) + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns) throws IOException { - this(store, scan.getCacheBlocks(), scan, columns, store.scanInfo.getTtl(), - store.scanInfo.getMinVersions()); + this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), + scanInfo.getMinVersions()); initializeMetricNames(); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException( "Cannot specify any column for a raw scan"); } - matcher = new ScanQueryMatcher(scan, store.scanInfo, columns, + matcher = new ScanQueryMatcher(scan, scanInfo, columns, ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); @@ -158,11 +159,11 @@ * @param smallestReadPoint the readPoint that we should use for tracking * versions */ - StoreScanner(Store store, Scan scan, + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - this(store, false, scan, null, store.scanInfo.getTtl(), - store.scanInfo.getMinVersions()); + this(store, false, scan, null, scanInfo.getTtl(), + scanInfo.getMinVersions()); initializeMetricNames(); matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType, smallestReadPoint, earliestPutTs, oldestUnexpiredTS); @@ -181,7 +182,7 @@ /** Constructor for testing. */ StoreScanner(final Scan scan, Store.ScanInfo scanInfo, - StoreScanner.ScanType scanType, final NavigableSet columns, + ScanType scanType, final NavigableSet columns, final List scanners) throws IOException { this(scan, scanInfo, scanType, columns, scanners, HConstants.LATEST_TIMESTAMP); @@ -189,7 +190,7 @@ // Constructor for testing. StoreScanner(final Scan scan, Store.ScanInfo scanInfo, - StoreScanner.ScanType scanType, final NavigableSet columns, + ScanType scanType, final NavigableSet columns, final List scanners, long earliestPutTs) throws IOException { this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), @@ -598,14 +599,5 @@ static void enableLazySeekGlobally(boolean enable) { lazySeekEnabledGlobally = enable; } - - /** - * Enum to distinguish general scan types. - */ - public static enum ScanType { - MAJOR_COMPACT, - MINOR_COMPACT, - USER_SCAN - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -212,9 +211,7 @@ "ms in store " + this); // Why not just pass a HColumnDescriptor in here altogether? Even if have // to clone it? - scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), - family.getMaxVersions(), ttl, family.getKeepDeletedCells(), - timeToPurgeDeletes, this.comparator); + scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); // By default, compact if storefile.count >= minFilesToCompact @@ -733,10 +730,16 @@ // Use a store scanner to find which rows to flush. // Note that we need to retain deletes, hence // treat this as a minor compaction. - InternalScanner scanner = new StoreScanner(this, scan, Collections - .singletonList(new CollectionBackedScanner(set, this.comparator)), - ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + InternalScanner scanner = null; + KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator); + if (getHRegion().getCoprocessorHost() != null) { + scanner = getHRegion().getCoprocessorHost().preFlush(this, memstoreScanner); + } + if (scanner == null) { + scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( + set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem @@ -1941,11 +1944,18 @@ * are not in a compaction. * @throws IOException */ - public StoreScanner getScanner(Scan scan, + public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { - return new StoreScanner(this, scan, targetCols); + KeyValueScanner scanner = null; + if (getHRegion().getCoprocessorHost() != null) { + scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); + } + if (scanner == null) { + scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + } + return scanner; } finally { lock.readLock().unlock(); } @@ -2065,7 +2075,7 @@ return compactionSize > throttlePoint; } - HRegion getHRegion() { + public HRegion getHRegion() { return this.region; } @@ -2210,6 +2220,10 @@ return comparator; } + public ScanInfo getScanInfo() { + return scanInfo; + } + /** * Immutable information for scans over a store. */ @@ -2227,6 +2241,17 @@ + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN); /** + * @param family {@link HColumnDescriptor} describing the column family + * @param ttl Store's TTL (in ms) + * @param timeToPurgeDeletes duration in ms after which a delete marker can + * be purged during a major compaction. + * @param comparator The store's comparator + */ + public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) { + this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family + .getKeepDeletedCells(), timeToPurgeDeletes, comparator); + } + /** * @param family Name of this store's column family * @param minVersions Store's MIN_VERSIONS setting * @param maxVersions Store's VERSIONS setting Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java (working copy) @@ -0,0 +1,10 @@ +package org.apache.hadoop.hbase.regionserver; + +/** + * Enum to distinguish general scan types. + */ +public enum ScanType { + MAJOR_COMPACT, + MINOR_COMPACT, + USER_SCAN +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -17,7 +17,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.util.List; -import java.util.Map; +import java.util.NavigableSet; import com.google.common.collect.ImmutableList; @@ -37,9 +37,12 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; @@ -75,6 +78,12 @@ boolean abortRequested) { } @Override + public InternalScanner preFlush(final ObserverContext c, + Store store, KeyValueScanner memstoreScanner) throws IOException { + return null; + } + + @Override public void preFlush(ObserverContext e) throws IOException { } @@ -106,6 +115,13 @@ } @Override + public InternalScanner preCompact(final ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs) + throws IOException { + return null; + } + + @Override public void postCompact(ObserverContext e, final Store store, final StoreFile resultFile) throws IOException { } @@ -240,6 +256,12 @@ final Scan scan, final RegionScanner s) throws IOException { return s; } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + Store store, final Scan scan, final NavigableSet targetCols) throws IOException { + return null; + } @Override public RegionScanner postScannerOpen(final ObserverContext e, Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1366128) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.List; +import java.util.NavigableSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,9 +36,12 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -65,9 +69,25 @@ void postOpen(final ObserverContext c); /** + * Called before a memstore is flushed to disk and prior to creating the scanner to read the + * memstore. To override or modify the compaction process, + * implementing classes can return a new scanner to provide the KeyValues to be + * stored into the new {@code StoreFile} or null to perform the default processing. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param memstoreScanner the scanner for the memstore that is flushed + * @return the scanner to use during compaction. {@code null} if the default implementation + * is to be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preFlush(final ObserverContext c, + final Store store, final KeyValueScanner memstoreScanner) throws IOException; + + /** * Called before the memstore is flushed to disk. * @param c the environment provided by the region server * @throws IOException if an error occurred on the coprocessor + * @deprecated use {@link #preFlush(ObserverContext, Store, KeyValueScanner)} instead */ void preFlush(final ObserverContext c) throws IOException; @@ -123,11 +143,32 @@ * @return the scanner to use during compaction. Should not be {@code null} * unless the implementation is writing new store files on its own. * @throws IOException if an error occurred on the coprocessor + * @deprecated use {@link #preCompact(ObserverContext, Store, List, ScanType, long)} instead */ InternalScanner preCompact(final ObserverContext c, final Store store, final InternalScanner scanner) throws IOException; /** + * Called prior to writing the {@link StoreFile}s selected for compaction into + * a new {@code StoreFile} and prior to creating the scanner to used to read the + * input files. To override or modify the compaction process, + * implementing classes can return a new scanner to provide the KeyValues to be + * stored into the new {@code StoreFile} or null to perform the default processing. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanners the list {@link StoreFileScanner}s to be read from + * @param scantype the {@link ScanType} indicating whether this is a major or minor compaction + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved + * store files + * @return the scanner to use during compaction. {@code null} if the default implementation + * is to be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preCompact(final ObserverContext c, + final Store store, List scanners, ScanType scanType, long earliestPutTs) + throws IOException; + + /** * Called after compaction has completed and the new store file has been * moved in to place. * @param c the environment provided by the region server @@ -550,6 +591,21 @@ throws IOException; /** + * Called before a store opens a new scanner. + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param store the store being scanned + * @param scan the Scan specification + * @param targetCols columns to be used in the scanner + * @return an KeyValueScanner instance to use or null to use the default implementation + * @throws IOException if an error occurred on the coprocessor + */ + KeyValueScanner preStoreScannerOpen(final ObserverContext c, + Store store, final Scan scan, final NavigableSet targetCols) throws IOException; + + /** * Called after the client opens a new scanner. *

* Call CoprocessorEnvironment#complete to skip any subsequent chained