Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision f8a29b0f01cf433e858bdd73348716d9cc9e6f35) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (date 1513062598000) @@ -1207,12 +1207,19 @@ */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { - // Increment the ref count - refCount.incrementAndGet(); + incrementRefCount(); return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } + /** + * Increment the ref count associated with the reader when ever a scanner associated + * with the reader is opened or if the file has to be held long enough. + */ + void incrementRefCount() { + refCount.incrementAndGet(); + } + /** * Decrement the ref count associated with the reader when ever a scanner associated * with the reader is closed Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision f8a29b0f01cf433e858bdd73348716d9cc9e6f35) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (date 1513062598000) @@ -463,6 +463,16 @@ this.closing = true; clearAndClose(scannersForDelayedClose); clearAndClose(memStoreScannersAfterFlush); + // If the client had disappeared when we updated the flushed files in the scanner, we should + // reset the counter back to 0, so compaction discharger can take care of this. + if (!flushedStoreFiles.isEmpty()) { + for (StoreFile storeFile : flushedStoreFiles) { + // Since we already created scanners, we can drop the reader count we raised in observers. + if (storeFile.getReader() != null) { + storeFile.getReader().decrementRefCount(); + } + } + } // Under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -833,6 +843,10 @@ flushLock.lock(); try { flushed = true; + for (StoreFile storeFile : sfs) { + // Increment reader count so we don't accidentally let it off + storeFile.createReader().incrementRefCount(); + } flushedStoreFiles.addAll(sfs); if (!CollectionUtils.isEmpty(memStoreScanners)) { clearAndClose(memStoreScannersAfterFlush); @@ -906,6 +920,12 @@ isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false)); allScanners.addAll(memStoreScannersAfterFlush); scanners = selectScannersFrom(allScanners); + for (StoreFile storeFile : flushedStoreFiles) { + // Since we already created scanners, we can drop the reader count we raised in observers. + if (storeFile.getReader() != null) { + storeFile.getReader().decrementRefCount(); + } + } // Clear the current set of flushed store files so that they don't get added again flushedStoreFiles.clear(); memStoreScannersAfterFlush.clear(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestStFileMissingScanner.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestStFileMissingScanner.java (date 1513062269000) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestStFileMissingScanner.java (date 1513062269000) @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category(MediumTests.class) +public class TestStFileMissingScanner { + private static final Log LOG = LogFactory.getLog(TestStFileMissingScanner.class); + + protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] FAMILY = Bytes.toBytes("f"); + private static byte [] QUALIFIER = Bytes.toBytes("c"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL, 10 * 1000); + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testStoreFileMissing() throws Exception { + TableName TABLE = TableName.valueOf("testStoreFileMissing"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + + // Write 3 records and create 3 store files. + write(ht, "row1"); + admin.flush(TABLE); + write(ht, "row2"); + admin.flush(TABLE); + write(ht, "row3"); + admin.flush(TABLE); + + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner results = ht.getScanner(scan); + + // Read first item + assertEquals("row1", Bytes.toString(results.next().getRow())); + + // Create a new file in between scan nexts + write(ht, "row4"); + admin.flush(TABLE); + + // Compact the table + TEST_UTIL.getHBaseCluster().compact(TABLE, true); + + // Lets wait a few seconds more than compaction discharger frequency + Thread.sleep(TEST_UTIL.getConfiguration() + .getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL, 2 * 60 * 1000) + 5000); + + // This issues scan next + assertEquals("row2", Bytes.toString(results.next().getRow())); + + results.close(); + } + + private void write(HTable ht, String row1) throws IOException { + byte[] row = Bytes.toBytes(row1); + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, row); + ht.put(put); + ht.flushCommits(); + } +}