diff --git hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 1180249..8385bd9 100644 --- hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -34,10 +34,9 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; @@ -202,8 +201,10 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { } Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + return new StoreScanner(store, scanInfo, scan, + Collections.singletonList(memstoreScanner), + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP, true); } @Override @@ -219,7 +220,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); return new StoreScanner(store, scanInfo, scan, scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); + store.getSmallestReadPoint(), earliestPutTs, false); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index efd250b..c8c0f57 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -96,7 +94,7 @@ abstract class StoreFlusher { scan.setMaxVersions(store.getScanInfo().getMaxVersions()); scanner = new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, - smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + smallestReadPoint, HConstants.OLDEST_TIMESTAMP, true); } assert scanner != null; if (store.getCoprocessorHost() != null) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index f53e854..b3eb8bc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -198,8 +198,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, ScanType scanType, - long smallestReadPoint, long earliestPutTs) throws IOException { - this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); + long smallestReadPoint, long earliestPutTs, boolean scannerForFlush) throws IOException { + this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null, + scannerForFlush); } /** @@ -215,14 +216,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, long smallestReadPoint, long earliestPutTs, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { + byte[] dropDeletesFromRow, byte[] dropDeletesToRow, boolean scannerForFlush) + throws IOException { this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, - earliestPutTs, dropDeletesFromRow, dropDeletesToRow); + earliestPutTs, dropDeletesFromRow, dropDeletesToRow, scannerForFlush); } private StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, ScanType scanType, long smallestReadPoint, - long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { + long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow, + boolean scannerForFlush) throws IOException { this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(), ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); if (dropDeletesFromRow == null) { @@ -232,8 +235,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow); } - - this.store.addChangedReaderObserver(this); + if (!scannerForFlush) { + this.store.addChangedReaderObserver(this); + } // Filter the list of scanners using Bloom filters, time range, TTL, etc. scanners = selectScannersFrom(scanners); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9e792c4..1b86b11 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -255,7 +255,7 @@ public abstract class Compactor { Scan scan = new Scan(); scan.setMaxVersions(store.getFamily().getMaxVersions()); return new StoreScanner(store, store.getScanInfo(), scan, scanners, - scanType, smallestReadPoint, earliestPutTs); + scanType, smallestReadPoint, earliestPutTs, false); } /** @@ -273,6 +273,6 @@ public abstract class Compactor { Scan scan = new Scan(); scan.setMaxVersions(store.getFamily().getMaxVersions()); return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint, - earliestPutTs, dropDeletesFromRow, dropDeletesToRow); + earliestPutTs, dropDeletesFromRow, dropDeletesToRow, false); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 376c38b..65a5a46 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -120,7 +120,7 @@ public class TestRegionObserverScannerOpenHook { scan.setFilter(new NoDataFilter()); return new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, - store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP, true); } } @@ -137,7 +137,7 @@ public class TestRegionObserverScannerOpenHook { scan.setFilter(new NoDataFilter()); return new StoreScanner(store, store.getScanInfo(), scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + HConstants.OLDEST_TIMESTAMP, false); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java index 2eef283..a6e42f8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java @@ -380,7 +380,7 @@ public class HFileReadWriteTest { // Include deletes scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners, - ScanType.COMPACT_DROP_DELETES, Long.MIN_VALUE, Long.MIN_VALUE); + ScanType.COMPACT_DROP_DELETES, Long.MIN_VALUE, Long.MIN_VALUE, false); ArrayList kvs = new ArrayList(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 36493cd..a037c51 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -50,7 +50,8 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver { Scan scan = new Scan(); scan.setMaxVersions(oldSI.getMaxVersions()); return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP, + true); } /** @@ -67,7 +68,7 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver { Scan scan = new Scan(); scan.setMaxVersions(oldSI.getMaxVersions()); return new StoreScanner(store, scanInfo, scan, scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); + store.getSmallestReadPoint(), earliestPutTs, false); } @Override diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index e4e3244..9e73e9c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -33,7 +33,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -41,13 +40,14 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -55,16 +55,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; - import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -255,7 +254,7 @@ public class TestCoprocessorScanPolicy { scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + HConstants.OLDEST_TIMESTAMP, true); } @Override @@ -274,7 +273,7 @@ public class TestCoprocessorScanPolicy { Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); + store.getSmallestReadPoint(), earliestPutTs, false); } @Override