diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e0b4eb9..b7f351f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -157,6 +157,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner final List currentScanners = new ArrayList<>(); // flush update lock private final ReentrantLock flushLock = new ReentrantLock(); + // lock for closing. + private final ReentrantLock closeLock = new ReentrantLock(); protected final long readPt; private boolean topChanged = false; @@ -473,31 +475,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } private void close(boolean withDelayedScannersClose) { - if (this.closing) { - return; - } - if (withDelayedScannersClose) { - this.closing = true; - } - // For mob compaction, we do not have a store. - if (this.store != null) { - this.store.deleteChangedReaderObserver(this); - } - if (withDelayedScannersClose) { - clearAndClose(scannersForDelayedClose); - clearAndClose(memStoreScannersAfterFlush); - clearAndClose(flushedstoreFileScanners); - if (this.heap != null) { - this.heap.close(); - this.currentScanners.clear(); - this.heap = null; // CLOSED! + closeLock.lock(); + try { + if (this.closing) { + return; } - } else { - if (this.heap != null) { - this.scannersForDelayedClose.add(this.heap); - this.currentScanners.clear(); - this.heap = null; + if (withDelayedScannersClose) { + this.closing = true; + } + // For mob compaction, we do not have a store. + if (this.store != null) { + this.store.deleteChangedReaderObserver(this); } + if (withDelayedScannersClose) { + clearAndClose(scannersForDelayedClose); + clearAndClose(memStoreScannersAfterFlush); + clearAndClose(flushedstoreFileScanners); + if (this.heap != null) { + this.heap.close(); + this.currentScanners.clear(); + this.heap = null; // CLOSED! + } + } else { + if (this.heap != null) { + this.scannersForDelayedClose.add(this.heap); + this.currentScanners.clear(); + this.heap = null; + } + } + } finally { + closeLock.unlock(); } } @@ -876,8 +883,20 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { return; } + boolean updateReaders = false; flushLock.lock(); try { + if (!closeLock.tryLock()) { + // no lock acquired. + LOG.debug("StoreScanner already has the close lokc. There is no need to updateReaders"); + return; + } + // lock acquired + updateReaders = true; + if (this.closing) { + LOG.debug("StoreScanner already closing. There is no need to updateReaders"); + return; + } flushed = true; final boolean isCompaction = false; boolean usePread = get || scanUsePread; @@ -896,6 +915,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } finally { flushLock.unlock(); + if (updateReaders) { + closeLock.unlock(); + } } // Let the next() call handle re-creating and seeking } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 687b780..ca805e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.CellUtil.createCell; import static org.apache.hadoop.hbase.KeyValueTestUtil.create; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -32,26 +35,45 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.OptionalInt; +import java.util.Random; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Rule; @@ -70,10 +92,17 @@ public class TestStoreScanner { HBaseClassTestRule.forClass(TestStoreScanner.class); private static final Logger LOG = LoggerFactory.getLogger(TestStoreScanner.class); + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; @Rule public TestName name = new TestName(); private static final String CF_STR = "cf"; + private static HRegion region; private static final byte[] CF = Bytes.toBytes(CF_STR); static Configuration CONF = HBaseConfiguration.create(); + private static CacheConfig cacheConf; + private static FileSystem fs; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static String ROOT_DIR = + TEST_UTIL.getDataTestDir("TestHFile").toString(); private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); @@ -97,7 +126,20 @@ public class TestStoreScanner { private static final int CELL_GRID_BLOCK3_BOUNDARY = 11; private static final int CELL_GRID_BLOCK4_BOUNDARY = 15; private static final int CELL_GRID_BLOCK5_BOUNDARY = 19; - + private final static byte[] fam = Bytes.toBytes("cf_1"); + + @BeforeClass + public static void setUp() throws Exception { + CONF = TEST_UTIL.getConfiguration(); + cacheConf = new CacheConfig(CONF); + fs = TEST_UTIL.getTestFileSystem(); + TableName tableName = TableName.valueOf("test"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(fam)); + HRegionInfo info = new HRegionInfo(tableName, null, null, false); + Path path = TEST_UTIL.getDataTestDir("test"); + region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); + } /** * Five rows by four columns distinguished by column qualifier (column qualifier is one of the * four rows... ONE, TWO, etc.). Exceptions are a weird row after TWO; it is TWO_POINT_TWO. @@ -847,6 +889,172 @@ public class TestStoreScanner { } } + private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount, + int minAllocSize) { + Configuration that = HBaseConfiguration.create(CONF); + that.setInt(BUFFER_SIZE_KEY, bufSize); + that.setInt(MAX_BUFFER_COUNT_KEY, bufCount); + // All ByteBuffers will be allocated from the buffers. + that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize); + return ByteBuffAllocator.create(that, reservoirEnabled); + } + + private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) { + // Fill the allocator with bufCount ByteBuffer + List buffs = new ArrayList<>(); + for (int i = 0; i < bufCount; i++) { + buffs.add(alloc.allocateOneBuffer()); + Assert.assertEquals(alloc.getQueueSize(), 0); + } + buffs.forEach(ByteBuff::release); + Assert.assertEquals(alloc.getQueueSize(), bufCount); + } + + private Path writeStoreFile() throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); + HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); + StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) + .withComparator(CellComparatorImpl.COMPARATOR).withFileContext(meta).build(); + + final int rowLen = 32; + Random RNG = new Random(); + for (int i = 0; i < 1000; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i); + byte[] v = RandomKeyValueUtil.randomValue(RNG); + int cfLen = RNG.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + return sfw.getPath(); + } + + public static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + + private HStoreFile readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc) + throws Exception { + // Open the file reader with block cache disabled. + HStoreFile file = new HStoreFile(this.fs, storeFilePath, conf, cacheConf, BloomType.NONE, true); + return file; + } + + @Test + public void testScannerCloseAndUpdateReaders1() throws Exception { + testScannerCloseAndUpdateReaderInternal(true, false); + } + + @Test + public void testScannerCloseAndUpdateReaders2() throws Exception { + testScannerCloseAndUpdateReaderInternal(false, true); + } + + private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) + throws IOException, InterruptedException { + int bufCount = 32; + // AllByteBuffers will be allocated from the buffers. + ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0); + fillByteBuffAllocator(alloc, bufCount); + // start write to store file. + Path path = writeStoreFile(); + HStoreFile file = null; + List files = new ArrayList(); + try { + file = readStoreFile(path, CONF, alloc); + files.add(file); + } catch (Exception e) { + // fail test + assertTrue(false); + } + alloc.clean(); + scanFixture(kvs); + // scanners.add(storeFileScanner); + try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, + new Scan(), getCols("a", "d"), 100l)) { + Thread closeThread = new Thread() { + public void run() { + scan.close(awaitClose, true); + } + }; + closeThread.start(); + Thread updateThread = new Thread() { + public void run() { + try { + scan.updateReaders(awaitUpdate, files, Collections.emptyList()); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + updateThread.start(); + // complete both the threads + closeThread.join(); + // complete both the threads + updateThread.join(); + if (file.getReader() != null) { + // the fileReader is not null when the updateReaders has completed first. + // in the other case the fileReader will be null. + int refCount = file.getReader().getRefCount(); + LOG.info("the store scanner count is " + refCount); + assertTrue("The store scanner count should be 0", refCount == 0); + } + } + } + + private static class ExtendedStoreScanner extends StoreScanner { + private CountDownLatch latch = new CountDownLatch(1); + + public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, + NavigableSet columns, long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + public void updateReaders(boolean await, List sfs, + List memStoreScanners) throws IOException { + if (await) { + try { + latch.await(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + super.updateReaders(sfs, memStoreScanners); + if (!await) { + latch.countDown(); + } + } + + // creating a dummy close + public void close(boolean await, boolean dummy) { + if (await) { + try { + latch.await(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + super.close(); + if (!await) { + latch.countDown(); + } + } + + } @Test @Ignore("this fails, since we don't handle deletions, etc, in peek") public void testPeek() throws Exception {