diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index 0bc75e7d07..6c5c323ad7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Store.ScannerTicket; /** @@ -30,9 +30,17 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface ChangedReadersObserver { + + /** + * @return the read point of the current scan + */ + long getReadPoint(); + /** * Notify observers. + * + * @param ticket The information of scanner * @throws IOException e */ - void updateReaders(List sfs) throws IOException; + void updateReaders(ScannerTicket ticket) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index b244997cd7..0b71b92cfa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -317,18 +317,24 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { + MutableSegment activeTmp = active; List pipelineList = pipeline.getSegments(); List snapshotList = snapshot.getAllSegments(); long order = 1 + pipelineList.size() + snapshotList.size(); // The list of elements in pipeline + the active element + the snapshot segment // The order is the Segment ordinal - List list = new ArrayList((int) order); - order = addToScanners(active, readPt, order, list); + List list = createList((int) order); + order = addToScanners(activeTmp, readPt, order, list); order = addToScanners(pipelineList, readPt, order, list); addToScanners(snapshotList, readPt, order, list); return list; } + @VisibleForTesting + protected List createList(int capacity) { + return new ArrayList<>(capacity); + } + /** * Check whether anything need to be done based on the current active set size. * The method is invoked upon every addition to the active set. @@ -428,7 +434,7 @@ public class CompactingMemStore extends AbstractMemStore { } } - private void pushActiveToPipeline(MutableSegment active) { + protected void pushActiveToPipeline(MutableSegment active) { if (!active.isEmpty()) { pipeline.pushHead(active); resetActive(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cbdaa1b822..67a1f5ac15 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -105,7 +105,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; */ @InterfaceAudience.Private public class HStore implements Store { - private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; + public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; @@ -244,24 +244,27 @@ public class HStore implements Store { // Why not just pass a HColumnDescriptor in here altogether? Even if have // to clone it? scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); - String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); MemoryCompactionPolicy inMemoryCompaction = family.getInMemoryCompaction(); if(inMemoryCompaction == null) { inMemoryCompaction = MemoryCompactionPolicy.valueOf( conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT)); } + String className; switch (inMemoryCompaction) { case BASIC : case EAGER : - className = CompactingMemStore.class.getName(); - this.memstore = new CompactingMemStore(conf, this.comparator, this, - this.getHRegion().getRegionServicesForStores(), inMemoryCompaction); + Class clz = conf.getClass(MEMSTORE_CLASS_NAME, + CompactingMemStore.class, CompactingMemStore.class); + className = clz.getName(); + this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, + this.getHRegion().getRegionServicesForStores(), inMemoryCompaction}); break; case NONE : default: - this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { - Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator }); + className = DefaultMemStore.class.getName(); + this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator }); } LOG.info("Memstore class name is " + className); this.offPeakHours = OffPeakHours.getInstance(conf); @@ -1149,7 +1152,14 @@ public class HStore implements Store { */ private void notifyChangedReadersObservers(List sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { - o.updateReaders(sfs); + List memStoreScanners; + this.lock.readLock().lock(); + try { + memStoreScanners = this.memstore.getScanners(o.getReadPoint()); + } finally { + this.lock.readLock().unlock(); + } + o.updateReaders(new ScannerTicketImpl(sfs, memStoreScanners)); } } @@ -1212,6 +1222,22 @@ public class HStore implements Store { } @Override + public List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { + if (!(ticket instanceof ScannerTicketImpl)) { + throw new IOException("Unsupported ticket:" + (ticket == null ? "null" : ticket.getClass().getName())); + } + ScannerTicketImpl ticketImpl = (ScannerTicketImpl) ticket; + List memStoreScanners = ticketImpl.getMemStoreScanners(); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(ticketImpl.getStoreFiles(), + cacheBlocks, usePread, isCompaction, false, matcher, readPt); + List scanners = new ArrayList<>(sfScanners.size() + memStoreScanners.size()); + scanners.addAll(sfScanners); + scanners.addAll(memStoreScanners); + return scanners; + } + + @Override public void addChangedReaderObserver(ChangedReadersObserver o) { this.changedReaderObservers.add(o); } @@ -2527,4 +2553,46 @@ public class HStore implements Store { lock.writeLock().unlock(); } } + + private static class ScannerTicketImpl implements ScannerTicket { + + private final List files; + private final List memStoreScanners; + + ScannerTicketImpl(final List files, final List memStoreScanners) { + this.files = new ArrayList<>(files); + this.memStoreScanners = new ArrayList<>(memStoreScanners); + } + + @Override + public void merge(ScannerTicket newOne) throws IOException { + if (newOne instanceof ScannerTicketImpl) { + this.files.addAll(((ScannerTicketImpl) newOne).getStoreFiles()); + closeMemStoreScanners(); + this.memStoreScanners.addAll(((ScannerTicketImpl) newOne).getMemStoreScanners()); + return; + } + throw new IOException("Unsupported ticket:" + newOne.getClass().getName()); + } + + private List getStoreFiles() { + return files; + } + + private List getMemStoreScanners() { + return memStoreScanners; + } + + private void closeMemStoreScanners() { + for (KeyValueScanner s : memStoreScanners) { + s.close(); + } + memStoreScanners.clear(); + } + + @Override + public void close() { + closeMemStoreScanners(); + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 76595f3622..9543229199 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -158,6 +159,21 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) throws IOException; + /** + * Create scanners on the given files and if needed on the memstore with no + * filtering based on TTL (that happens further down the line). + * + * @param ticket the information of files and memstore on which the scanners has to be created + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param readPt the read point of the current scan + * @return scanners on the given files and on the memstore if specified + */ + List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException; + ScanInfo getScanInfo(); /** @@ -544,4 +560,20 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return true if the memstore may need some extra memory space */ boolean isSloppyMemstore(); + + /** + * Record the information of the new scanners. + */ + interface ScannerTicket extends Closeable { + + /** + * Merge two ticket. + * + * @param newOne the another ticket + */ + void merge(ScannerTicket newOne) throws IOException; + + @Override + void close(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d39a6ee7e2..a242319d6f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; +import org.apache.hadoop.hbase.regionserver.Store.ScannerTicket; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; @@ -129,8 +130,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final boolean scanUsePread; // Indicates whether there was flush during the course of the scan protected volatile boolean flushed = false; - // generally we get one file from a flush - protected List flushedStoreFiles = new ArrayList<>(1); + // generally we get the information of scanners from a flush + protected ScannerTicket ticket = null; // The current list of scanners protected List currentScanners = new ArrayList<>(); // flush update lock @@ -476,6 +477,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); if (withHeapClose) { + if (ticket != null) { + ticket.close(); + ticket = null; + } for (KeyValueHeap h : this.heapsForDelayedClose) { h.close(); } @@ -793,13 +798,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return true; } + @Override + public long getReadPoint() { + return this.readPt; + } + // Implementation of ChangedReadersObserver @Override - public void updateReaders(List sfs) throws IOException { - flushed = true; + public void updateReaders(ScannerTicket ticket) throws IOException { + if (ticket == null) { + return; + } flushLock.lock(); + flushed = true; try { - flushedStoreFiles.addAll(sfs); + if (this.ticket == null) { + this.ticket = ticket; + } else { + this.ticket.merge(ticket); + } } finally { flushLock.unlock(); } @@ -835,10 +852,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners = null; try { flushLock.lock(); - scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, - scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); - // Clear the current set of flushed store files so that they don't get added again - flushedStoreFiles.clear(); + scanners = selectScannersFrom(store.getScanners(ticket, cacheBlocks, + scanUsePread, false, matcher, this.readPt)); + ticket = null; } finally { flushLock.unlock(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 64e8397c56..3a8ea46230 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.collect.Lists; - import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; @@ -38,9 +37,16 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.IntStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -53,14 +59,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -70,6 +79,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -161,9 +171,14 @@ public class TestStore { init(methodName, conf, htd, hcd); } - @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd) throws IOException { + return init(methodName, conf, htd, hcd, null); + } + + @SuppressWarnings("deprecation") + private Store init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyScannerHook hook) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -185,8 +200,11 @@ public class TestStore { final WALFactory wals = new WALFactory(walConf, null, methodName); HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), fs, conf, info, htd, null); - - store = new HStore(region, hcd, conf); + if (hook == null) { + store = new HStore(region, hcd, conf); + } else { + store = new MyStore(region, hcd, conf, hook); + } return store; } @@ -939,4 +957,199 @@ public class TestStore { //ensure that replaceStoreFiles is not called if files are not refreshed verify(spiedStore, times(0)).replaceStoreFiles(null, null); } -} + + private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + + @Test + public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + init(name.getMethodName(), conf, hcd); + byte[] value = Bytes.toBytes("value"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), memStoreSize); + store.add(createCell(qf2, ts, seqId, value), memStoreSize); + store.add(createCell(qf3, ts, seqId, value), memStoreSize); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + MyCompactingMemStore.START_TEST.set(true); + Runnable flush = () -> { + storeFlushCtx.prepare(); + }; + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(flush); + InternalScanner scanner = (InternalScanner) store.getScanner( + new Scan(new Get(row)), quals, seqId + 1); + service.shutdown(); + service.awaitTermination(20, TimeUnit.SECONDS); + try { + try { + List results = new ArrayList<>(); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value)); + } + } finally { + scanner.close(); + } + } finally { + MyCompactingMemStore.START_TEST.set(false); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + } + } + + @Test + public void testScanWithDoubleFlush() throws IOException { + Configuration conf = HBaseConfiguration.create(); + // Initialize region + MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) -> { + final long tmpId = id++; + ExecutorService s = Executors.newSingleThreadExecutor(); + s.submit(() -> { + try { + // flush the store before storescanner updates the scanners from store. + // The current data will be flushed into files, and the memstore will + // be clear. + // -- phase (4/4) + flushStore(store1, tmpId); + }catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + s.shutdown(); + try { + // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. + s.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + } + }); + byte[] oldValue = Bytes.toBytes("oldValue"); + byte[] currentValue = Bytes.toBytes("currentValue"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize); + long snapshotId = id++; + // push older data into snapshot -- phase (1/4) + StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); + storeFlushCtx.prepare(); + + // insert current data into active -- phase (2/4) + myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) myStore.getScanner( + new Scan(new Get(row)), quals, seqId + 1)) { + // complete the flush -- phase (3/4) + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + + List results = new ArrayList<>(); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(currentValue) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, currentValue)); + } + } + } + + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(5); + return (MyStore) init(methodName, conf, htd, hcd, hook); + } + + private static class MyStore extends HStore { + private final MyScannerHook hook; + MyStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam, MyScannerHook hook) throws IOException { + super(region, family, confParam); + this.hook = hook; + } + @Override + public List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { + hook.hook(this); + return super.getScanners(ticket, cacheBlocks, usePread, isCompaction, matcher, readPt); + } + @Override + public List getScanners(List files, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + hook.hook(this); + return super.getScanners(files, cacheBlocks, usePread, + isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner); + } + } + private interface MyScannerHook { + void hook(MyStore store) throws IOException; + } + + public static class MyCompactingMemStore extends CompactingMemStore { + private static final AtomicBoolean START_TEST = new AtomicBoolean(false); + private final CountDownLatch getScannerLatch = new CountDownLatch(1); + private final CountDownLatch snapshotLatch = new CountDownLatch(1); + public MyCompactingMemStore(Configuration conf, CellComparator c, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, c, store, regionServices, compactionPolicy); + } + + @Override + protected List createList(int capacity) { + if (START_TEST.get()) { + try { + getScannerLatch.countDown(); + snapshotLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return new ArrayList<>(capacity); + } + @Override + protected void pushActiveToPipeline(MutableSegment active) { + if (START_TEST.get()) { + try { + getScannerLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + super.pushActiveToPipeline(active); + if (START_TEST.get()) { + snapshotLatch.countDown(); + } + } + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 3e2949cc64..ad93171061 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -861,9 +861,9 @@ public class TestStoreScanner { // normally cause an NPE because scan.store is null. So as long as we get through these // two calls we are good and the bug was quashed. - scan.updateReaders(new ArrayList<>()); + scan.updateReaders(null); - scan.updateReaders(new ArrayList<>()); + scan.updateReaders(null); scan.peek(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index cdf84d2990..70b36fcf48 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -130,7 +130,7 @@ public class TestWideScanner extends HBaseTestCase { ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); - ss.updateReaders(new ArrayList<>()); + ss.updateReaders(null); } } while (more);