Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (date 1477661994000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (revision ) @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ShipperListener; @@ -294,7 +295,8 @@ try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, + null, smallestReadPoint); if (scanner == null) { scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); } @@ -346,19 +348,21 @@ * @param scanners File scanners for compaction files. * @return Scanner override by coprocessor; null if not overriding. */ + @Deprecated protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, ScanType scanType, long earliestPutTs, List scanners) throws IOException { - return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null); + return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null, + MultiVersionConcurrencyControl.NONE); } protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, final ScanType scanType, final long earliestPutTs, final List scanners, - User user) throws IOException { + User user, final long readPoint) throws IOException { if (store.getCoprocessorHost() == null) { return null; } return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, - earliestPutTs, request, user); + earliestPutTs, request, user, readPoint); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (date 1477661994000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision ) @@ -286,10 +286,39 @@ * @return the scanner to use during compaction. {@code null} if the default implementation is to * be used. * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, + * InternalScanner, CompactionRequest, long)} instead. */ + @Deprecated InternalScanner preCompactScannerOpen(final ObserverContext c, final Store store, List scanners, final ScanType scanType, final long earliestPutTs, final InternalScanner s, CompactionRequest request) + throws IOException; + + /** + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile} and prior to creating the scanner used to read the input files. To override + * or modify the compaction process, implementing classes can return a new scanner to provide the + * KeyValues to be stored into the new {@code StoreFile} or null to perform the default + * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanners the list {@link org.apache.hadoop.hbase.regionserver.StoreFileScanner}s + * to be read from + * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store + * files + * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain + * @param request the requested compaction + * @param readPt the read point + * @return the scanner to use during compaction. {@code null} if the default implementation is to + * be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, List scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s, CompactionRequest request, long readPt) throws IOException; /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (date 1477661994000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision ) @@ -531,18 +531,19 @@ /** * See - * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} + * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, + * InternalScanner, CompactionRequest, long)} */ public InternalScanner preCompactScannerOpen(final Store store, final List scanners, final ScanType scanType, final long earliestPutTs, - final CompactionRequest request, final User user) throws IOException { + final CompactionRequest request, final User user, final long readPoint) throws IOException { return execOperationWithResult(null, coprocessors.isEmpty() ? null : new RegionOperationWithResult(user) { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, - earliestPutTs, getResult(), request)); + earliestPutTs, getResult(), request, readPoint)); } }); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (date 1477661994000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision ) @@ -212,6 +212,13 @@ } @Override + public InternalScanner preCompactScannerOpen(ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs, + InternalScanner s, CompactionRequest request, long readPt) throws IOException { + return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request); + } + + @Override public void postCompact(ObserverContext e, final Store store, final StoreFile resultFile) throws IOException { }