Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1519556) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HRegionInfo; @@ -38,6 +40,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -46,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -445,4 +452,18 @@ List> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } + + @Override + public Reader preStoreFileReaderOpen(ObserverContext ctx, + FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, + DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException { + return reader; + } + + @Override + public Reader postStoreFileReaderOpen(ObserverContext ctx, + FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, + DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException { + return reader; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1519556) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HRegionInfo; @@ -36,6 +38,10 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -981,4 +987,47 @@ */ boolean postBulkLoadHFile(final ObserverContext ctx, List> familyPaths, boolean hasLoaded) throws IOException; + + /** + * Called before creation of Reader for a store file. + * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * + * @param ctx the environment provided by the region server + * @param fs fileystem to read from + * @param p path to the file + * @param in {@link FSDataInputStreamWrapper} + * @param size Full size of the file + * @param cacheConf + * @param preferredEncodingInCache + * @param r original reference file. This will be not null only when reading a split file. + * @param reader the base reader, if not {@code null}, from previous RegionObserver in the chain + * @return a Reader instance to use instead of the base reader if overriding + * default behavior, null otherwise + * @throws IOException + */ + StoreFile.Reader preStoreFileReaderOpen(final ObserverContext ctx, + final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size, + final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache, + final Reference r, StoreFile.Reader reader) throws IOException; + + /** + * Called after the creation of Reader for a store file. + * + * @param ctx the environment provided by the region server + * @param fs fileystem to read from + * @param p path to the file + * @param in {@link FSDataInputStreamWrapper} + * @param size Full size of the file + * @param cacheConf + * @param preferredEncodingInCache + * @param r original reference file. This will be not null only when reading a split file. + * @param reader the base reader instance + * @return The reader to use + * @throws IOException + */ + StoreFile.Reader postStoreFileReaderOpen(final ObserverContext ctx, + final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size, + final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache, + final Reference r, StoreFile.Reader reader) throws IOException; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1519556) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -474,7 +474,9 @@ } private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException { - StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf, + StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); + info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); + StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType(), encoder); storeFile.createReader(); return storeFile; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1519556) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; @@ -56,7 +57,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -1598,4 +1603,71 @@ return hasLoaded; } + /** + * @param fs fileystem to read from + * @param p path to the file + * @param in {@link FSDataInputStreamWrapper} + * @param size Full size of the file + * @param cacheConf + * @param preferredEncodingInCache + * @param r original reference file. This will be not null only when reading a split file. + * @return a Reader instance to use instead of the base reader if overriding + * default behavior, null otherwise + * @throws IOException + */ + public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p, + final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf, + final DataBlockEncoding preferredEncodingInCache, final Reference r) throws IOException { + StoreFile.Reader reader = null; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + reader = ((RegionObserver) env.getInstance()).preStoreFileReaderOpen(ctx, fs, p, in, + size, cacheConf, preferredEncodingInCache, r, reader); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return reader; + } + + /** + * @param fs fileystem to read from + * @param p path to the file + * @param in {@link FSDataInputStreamWrapper} + * @param size Full size of the file + * @param cacheConf + * @param preferredEncodingInCache + * @param r original reference file. This will be not null only when reading a split file. + * @param reader the base reader instance + * @return The reader to use + * @throws IOException + */ + public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p, + final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf, + final DataBlockEncoding preferredEncodingInCache, final Reference r, StoreFile.Reader reader) + throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + reader = ((RegionObserver) env.getInstance()).postStoreFileReaderOpen(ctx, fs, p, in, + size, cacheConf, preferredEncodingInCache, r, reader); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return reader; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (revision 1519556) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (working copy) @@ -79,6 +79,8 @@ // FileSystem information for the file. private final FileStatus fileStatus; + private RegionCoprocessorHost coprocessorHost; + /** * Create a Store File Info * @param conf the {@link Configuration} to use @@ -126,6 +128,14 @@ } } + /** + * Sets the region coprocessor env. + * @param coprocessorHost + */ + public void setRegionCoprocessorHost(RegionCoprocessorHost coprocessorHost) { + this.coprocessorHost = coprocessorHost; + } + /* * @return the Reference object associated to this StoreFileInfo. * null if the StoreFile is not a reference. @@ -182,12 +192,27 @@ long length = status.getLen(); if (this.reference != null) { hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status); - return new HalfStoreFileReader( - fs, this.getPath(), in, length, cacheConf, reference, dataBlockEncoding); } else { hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length); - return new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding); } + StoreFile.Reader reader = null; + if (this.coprocessorHost != null) { + reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length, + cacheConf, dataBlockEncoding, reference); + } + if (reader == null) { + if (this.reference != null) { + reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference, + dataBlockEncoding); + } else { + reader = new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding); + } + } + if (this.coprocessorHost != null) { + reader = this.coprocessorHost.postStoreFileReaderOpen(fs, this.getPath(), in, length, + cacheConf, dataBlockEncoding, reference, reader); + } + return reader; } /** Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1519556) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -33,6 +33,8 @@ import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -45,6 +47,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -54,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -103,6 +110,8 @@ final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0); final AtomicInteger ctPreBatchMutate = new AtomicInteger(0); final AtomicInteger ctPostBatchMutate = new AtomicInteger(0); + final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0); + final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0); @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -502,6 +511,22 @@ return hasLoaded; } + @Override + public Reader preStoreFileReaderOpen(ObserverContext ctx, + FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, + DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException { + ctPreStoreFileReaderOpen.incrementAndGet(); + return null; + } + + @Override + public Reader postStoreFileReaderOpen(ObserverContext ctx, + FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, + DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException { + ctPostStoreFileReaderOpen.incrementAndGet(); + return reader; + } + public boolean hadPreGet() { return ctPreGet.get() > 0; } @@ -666,4 +691,8 @@ public int getCtPostIncrement() { return ctPostIncrement.get(); } + + public boolean wasStoreFileReaderOpenCalled() { + return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0; + } }