diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index fbdf7b2..3eddcee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; @@ -133,7 +134,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Indicates whether there was flush during the course of the scan private volatile boolean flushed = false; // generally we get one file from a flush - private final List flushedStoreFiles = new ArrayList(1); + //private final List flushedStoreFiles = new ArrayList(1); + + private final List flushedstoreFileScanners = + new ArrayList(1); // generally we get one memstroe scanner from a flush private final List memStoreScannersAfterFlush = new ArrayList<>(1); // The current list of scanners @@ -463,6 +467,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.closing = true; clearAndClose(scannersForDelayedClose); clearAndClose(memStoreScannersAfterFlush); + // clear them at any case. In case scanner.next() was never called + // and there were some lease expiry we need to close all the scanners + // on the flushed files which are open + clearAndClose(flushedstoreFileScanners); // Under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -833,7 +841,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner flushLock.lock(); try { flushed = true; - flushedStoreFiles.addAll(sfs); + final boolean isCompaction = false; + boolean usePread = get || scanUsePread; + // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner + // calls next(). So its better we create scanners here rather than next() call. Ensure + // these scanners are properly closed() whether or not the scan is completed successfully + List scanners = store.getScanners(sfs, cacheBlocks, get, usePread, + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); + flushedstoreFileScanners.addAll(scanners); + //flushedStoreFiles.addAll(sfs); if (!CollectionUtils.isEmpty(memStoreScanners)) { clearAndClose(memStoreScannersAfterFlush); memStoreScannersAfterFlush.addAll(memStoreScanners); @@ -901,13 +917,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners = null; flushLock.lock(); try { - List allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size()); - allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false)); + List allScanners = + new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); + allScanners.addAll(flushedstoreFileScanners); allScanners.addAll(memStoreScannersAfterFlush); scanners = selectScannersFrom(allScanners); // Clear the current set of flushed store files so that they don't get added again - flushedStoreFiles.clear(); + flushedstoreFileScanners.clear(); memStoreScannersAfterFlush.clear(); } finally { flushLock.unlock(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java index f3d6575..87b7ad9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java @@ -336,6 +336,112 @@ public class TestCompactedHFilesDischarger { assertTrue(compactedfiles.size() == 0); } + @Test + public void test() throws Exception { + // Create the cleaner object + CompactedHFilesDischarger cleaner = + new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + startScannerThreads(); + // flush them + region.flush(true); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + + Store store = region.getStore(fam); + assertEquals(3, store.getStorefilesCount()); + + Collection storefiles = store.getStorefiles(); + Collection compactedfiles = + ((HStore)store).getStoreEngine().getStoreFileManager().getCompactedfiles(); + // None of the files should be in compacted state. + for (StoreFile file : storefiles) { + assertFalse(file.isCompactedAway()); + } + // Do compaction + region.compact(true); + + storefiles = store.getStorefiles(); + int usedReaderCount = 0; + int unusedReaderCount = 0; + for (StoreFile file : storefiles) { + if (file.getRefCount() == 0) { + unusedReaderCount++; + } + } + compactedfiles = ((HStore)store).getStoreEngine().getStoreFileManager().getCompactedfiles(); + for(StoreFile file : compactedfiles) { + //assertEquals("Refcount should be 3", 3, ((HStoreFile) file).getRefCount()); + usedReaderCount++; + } + // The newly compacted file will not be used by any scanner + assertEquals("unused reader count should be 1", 1, unusedReaderCount); + assertEquals("used reader count should be 3", 3, usedReaderCount); + // now run the cleaner + cleaner.chore(); + countDown(); + // No change in the number of store files as none of the compacted files could be cleaned up + assertEquals(1, store.getStorefilesCount()); + // assertEquals(3, + // ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); + while (scanCompletedCounter.get() != 3) { + Thread.sleep(100); + } + // reset + latch = new CountDownLatch(3); + scanCompletedCounter.set(0); + counter.set(0); + // Try creating a new scanner and it should use only the new file created after compaction + startScannerThreads(); + storefiles = store.getStorefiles(); + usedReaderCount = 0; + unusedReaderCount = 0; + for (StoreFile file : storefiles) { + if (file.getRefCount() == 3) { + usedReaderCount++; + } + } + compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); + for (StoreFile file : compactedfiles) { + assertEquals("Refcount should be 0", 0, file.getRefCount()); + unusedReaderCount++; + } + // Though there are files we are not using them for reads + //assertEquals("unused reader count should be 3", 3, unusedReaderCount); + assertEquals("used reader count should be 1", 1, usedReaderCount); + countDown(); + while (scanCompletedCounter.get() != 3) { + Thread.sleep(100); + } + // Run the cleaner again + cleaner.chore(); + // Now the cleaner should be able to clear it up because there are no active readers + assertEquals(1, store.getStorefilesCount()); + storefiles = store.getStorefiles(); + for (StoreFile file : storefiles) { + // Should not be in compacted state + assertFalse(file.isCompactedAway()); + } + compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); + assertTrue(compactedfiles.isEmpty()); + } protected void countDown() { // count down 3 times latch.countDown(); @@ -369,7 +475,7 @@ public class TestCompactedHFilesDischarger { try { initiateScan(region); } catch (IOException e) { - // do nothing + e.printStackTrace(); } }