diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c3bf2a8..10fc6ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -913,6 +913,8 @@ ImmutableCollection result = storeEngine.getStoreFileManager().clearFiles(); Collection compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); + // Close the changedReadersObservers + this.closeChangedReadersObservers(); // clear the compacted files if (CollectionUtils.isNotEmpty(compactedfiles)) { removeCompactedfiles(compactedfiles); @@ -1182,6 +1184,17 @@ } /** + * close all the observers in order to reduce the refCount of the target file. + */ + private void closeChangedReadersObservers() { + for (ChangedReadersObserver o : changedReaderObservers) { + if (StoreScanner.class.isAssignableFrom(o.getClass())) { + ((StoreScanner)o).close(); + } + } + } + + /** * Get all scanners with no filtering based on TTL (that happens further down the line). * @param cacheBlocks cache the blocks or not * @param usePread true to use pread, false if not @@ -1317,6 +1330,11 @@ this.changedReaderObservers.remove(o); } + @VisibleForTesting + Set getChangedReaderObservers() { + return this.changedReaderObservers; + } + ////////////////////////////////////////////////////////////////////////////// // Compaction ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index ecd980a..cbd8b3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -161,7 +161,7 @@ @Override public ImmutableCollection clearFiles() { ImmutableCollection result = state.allFilesCached; - this.state = new State(); + state.allFilesCached = ImmutableList.of(); this.fileStarts.clear(); this.fileEnds.clear(); return result; @@ -170,7 +170,7 @@ @Override public ImmutableCollection clearCompactedFiles() { ImmutableCollection result = state.allCompactedFilesCached; - this.state = new State(); + state.allCompactedFilesCached = ImmutableList.of(); return result; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 8dadd9b..eecd123 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -501,6 +501,25 @@ assertEquals(1, result.size()); } + @Test + public void testChangedReadersObserverClosed() throws IOException { + init(this.name.getMethodName()); + // Write a store file. + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); + flush(1); + HStoreFile file = this.store.getStorefiles().iterator().next(); + this.store.getScanner(new Scan(), null, 0); + assertEquals(file.getRefCount(), 1); + this.store.getScanner(new Scan(), null, 0); + assertEquals(file.getRefCount(), 2); + this.store.getScanner(new Scan(), null, 0); + assertEquals(file.getRefCount(), 3); + this.store.close(); + assertFalse(file.isReferencedInReads()); + assertEquals(0, this.store.getChangedReaderObservers().size()); + } + /** * Getting data from memstore only * @throws IOException