.../hbase/regionserver/DefaultStoreEngine.java | 7 + .../regionserver/DefaultStoreFileManager.java | 35 +++- .../apache/hadoop/hbase/regionserver/HStore.java | 46 +++++ .../apache/hadoop/hbase/regionserver/Store.java | 28 +++ .../hbase/regionserver/StoreFileManager.java | 6 + .../hadoop/hbase/regionserver/StoreScanner.java | 26 +-- .../hadoop/hbase/regionserver/TestStore.java | 208 +++++++++++++++++++-- 7 files changed, 318 insertions(+), 38 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 5c7b817..41d19a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Default StoreEngine creates the default compactor, policy, and store file manager, or * their derivatives. @@ -68,6 +70,11 @@ public class DefaultStoreEngine extends StoreEngine< createCompactor(conf, store); createCompactionPolicy(conf, store); createStoreFlusher(conf, store); + createStoreFileManager(conf, kvComparator); + } + + @VisibleForTesting + protected void createStoreFileManager(Configuration conf, CellComparator kvComparator) { storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf, compactionPolicy.getConf()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index f4f9aa6..0703bea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -44,7 +45,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; * Default implementation of StoreFileManager. Not thread-safe. */ @InterfaceAudience.Private -class DefaultStoreFileManager implements StoreFileManager { +@VisibleForTesting +public class DefaultStoreFileManager implements StoreFileManager { private static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class); private final CellComparator kvComparator; @@ -117,14 +119,17 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override + public final int getCompactedfilesCount() { + if (compactedfiles == null) { + return 0; + } + return compactedfiles.size(); + } + + @Override public void addCompactionResults( Collection newCompactedfiles, Collection results) { - ArrayList newStoreFiles = Lists.newArrayList(storefiles); - newStoreFiles.removeAll(newCompactedfiles); - if (!results.isEmpty()) { - newStoreFiles.addAll(results); - } - sortAndSetStoreFiles(newStoreFiles); + updateStoreFilesAfterCompaction(newCompactedfiles, results); ArrayList updatedCompactedfiles = null; if (this.compactedfiles != null) { updatedCompactedfiles = new ArrayList<>(this.compactedfiles); @@ -136,6 +141,17 @@ class DefaultStoreFileManager implements StoreFileManager { this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); } + @VisibleForTesting // added this method just for testing purpose + protected void updateStoreFilesAfterCompaction(Collection newCompactedfiles, + Collection results) { + ArrayList newStoreFiles = Lists.newArrayList(storefiles); + newStoreFiles.removeAll(newCompactedfiles); + if (!results.isEmpty()) { + newStoreFiles.addAll(results); + } + sortAndSetStoreFiles(newStoreFiles); + } + // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized // Let a background thread close the actual reader on these compacted files and also // ensure to evict the blocks from block cache so that they are no longer in @@ -241,5 +257,10 @@ class DefaultStoreFileManager implements StoreFileManager { public Comparator getStoreFileComparator() { return storeFileComparator; } + + @VisibleForTesting + public CompactionConfiguration getCompactionConfiguration() { + return this.comConf; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ed0f201..a70f86a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.Callable; @@ -727,6 +728,11 @@ public class HStore implements Store { return this.storeEngine.getStoreFileManager().getStorefiles(); } + @Override + public Collection getCompactedfiles() { + return this.storeEngine.getStoreFileManager().getCompactedfiles(); + } + /** * This throws a WrongRegionException if the HFile does not fit in this region, or an * InvalidHFileException if the HFile is not valid. @@ -1928,6 +1934,41 @@ public class HStore implements Store { } @Override + public List recreateScanners(List currentFileScanners, + boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + this.lock.readLock().lock(); + try { + Map name2File = + new HashMap<>(getStorefilesCount() + getCompactedfilesCount()); + for (StoreFile file : getStorefiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } +/* if (getCompactedfiles() != null) { + for (StoreFile file : getCompactedfiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } + }*/ + List filesToReopen = new ArrayList<>(); + for (KeyValueScanner kvs : currentFileScanners) { + assert kvs.isFileScanner(); + if (kvs.peek() == null) { + continue; + } + filesToReopen.add(name2File.get(kvs.getFilePath().getName())); + } + if (filesToReopen.isEmpty()) { + return null; + } + return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, + includeStartRow, stopRow, includeStopRow, readPt, false); + } finally { + this.lock.readLock().unlock(); + } + } + + @Override public String toString() { return this.getColumnFamilyName(); } @@ -1938,6 +1979,11 @@ public class HStore implements Store { } @Override + public int getCompactedfilesCount() { + return this.storeEngine.getStoreFileManager().getCompactedfilesCount(); + } + + @Override public long getMaxStoreFileAge() { long earliestTS = Long.MAX_VALUE; for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index f5e90eb..2e8aa2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf Collection getStorefiles(); + Collection getCompactedfiles(); + /** * Close all the readers We don't need to worry about subsequent requests because the Region * holds a write lock that will prevent any more reads or writes. @@ -116,6 +118,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf boolean includeStopRow, long readPt) throws IOException; /** + * Recreates the scanners on the current list of active store file scanners + * @param currentFileScanners the current set of active store file scanners + * @param cacheBlocks cache the blocks or not + * @param usePread use pread or not + * @param isCompaction is the scanner for compaction + * @param matcher the scan query matcher + * @param startRow the scan's start row + * @param includeStartRow should the scan include the start row + * @param stopRow the scan's stop row + * @param includeStopRow should the scan include the stop row + * @param readPt the read point of the current scane + * @param includeMemstoreScanner whether the current scanner should include memstorescanner + * @return list of scanners recreated on the current Scanners + * @throws IOException + */ + List recreateScanners(List currentFileScanners, + boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException; + + /** * Create scanners on the given files and if needed on the memstore with no filtering based on TTL * (that happens further down the line). * @param files the list of files on which the scanners has to be created @@ -367,6 +390,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf int getStorefilesCount(); /** + * @return Count of compacted store files + */ + int getCompactedfilesCount(); + + /** * @return Max age of store files in this store */ long getMaxStoreFileAge(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 933849c..df6251d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -104,6 +104,12 @@ public interface StoreFileManager { int getStorefileCount(); /** + * Returns the number of compacted files. + * @return The number of files. + */ + int getCompactedfilesCount(); + + /** * Gets the store files to scan for a Scan or Get request. * @param startRow Start row of the request. * @param stopRow Stop row of the request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 9849c93..8266797 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return heap.reseek(kv); } - private void trySwitchToStreamRead() { + @VisibleForTesting + void trySwitchToStreamRead() { if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null || bytesRead < preadMaxBytes) { return; @@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } scanUsePread = false; Cell lastTop = heap.peek(); - Map name2File = new HashMap<>(store.getStorefilesCount()); - for (StoreFile file : store.getStorefiles()) { - name2File.put(file.getFileInfo().getActiveFileName(), file); - } - List filesToReopen = new ArrayList<>(); List memstoreScanners = new ArrayList<>(); List scannersToClose = new ArrayList<>(); for (KeyValueScanner kvs : currentScanners) { if (!kvs.isFileScanner()) { + // collect memstorescanners here memstoreScanners.add(kvs); } else { scannersToClose.add(kvs); - if (kvs.peek() == null) { - continue; - } - filesToReopen.add(name2File.get(kvs.getFilePath().getName())); } } - if (filesToReopen.isEmpty()) { - return; - } List fileScanners = null; List newCurrentScanners; KeyValueHeap newHeap; try { - fileScanners = - store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(), - scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false); + // recreate the scanners on the current file scanners + fileScanners = ((HStore) store).recreateScanners(scannersToClose, cacheBlocks, false, false, + matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), + scan.includeStopRow(), readPt, false); + if (fileScanners == null) { + return; + } seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); newCurrentScanners.addAll(fileScanners); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 22539c5..ab4e1b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -34,6 +35,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -68,10 +70,13 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -137,6 +142,8 @@ public class TestStore { private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); + private static CountDownLatch latch; + private static volatile boolean switchToPread = false; /** @@ -184,6 +191,11 @@ public class TestStore { @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd, MyScannerHook hook) throws IOException { + return init(methodName, conf, htd, hcd, hook, false); + } + @SuppressWarnings("deprecation") + private Store init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -198,7 +210,8 @@ public class TestStore { } else { htd.addFamily(hcd); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); @@ -208,7 +221,7 @@ public class TestStore { if (hook == null) { store = new HStore(region, hcd, conf); } else { - store = new MyStore(region, hcd, conf, hook); + store = new MyStore(region, hcd, conf, hook, switchToPread); } return store; } @@ -590,6 +603,8 @@ public class TestStore { @After public void tearDown() throws Exception { + latch = null; + switchToPread = false; EnvironmentEdgeManagerTestHelper.reset(); } @@ -833,12 +848,21 @@ public class TestStore { public static class DummyStoreEngine extends DefaultStoreEngine { public static DefaultCompactor lastCreatedCompactor = null; + @Override - protected void createComponents( - Configuration conf, Store store, CellComparator comparator) throws IOException { + protected void createComponents(Configuration conf, Store store, CellComparator comparator) + throws IOException { super.createComponents(conf, store, comparator); lastCreatedCompactor = this.compactor; } + + @Override + protected void createStoreFileManager(Configuration conf, CellComparator kvComparator) { + super.createStoreFileManager(conf, kvComparator); + DefaultStoreFileManager temp = storeFileManager; + storeFileManager = new DummyStoreFileManager(kvComparator, temp.getStoreFileComparator(), + conf, temp.getCompactionConfiguration()); + } } @Test @@ -1039,6 +1063,13 @@ public class TestStore { return c; } + private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) + throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -1119,7 +1150,7 @@ public class TestStore { s.awaitTermination(3, TimeUnit.SECONDS); } catch (InterruptedException ex) { } - }); + }, false); byte[] oldValue = Bytes.toBytes("oldValue"); byte[] currentValue = Bytes.toBytes("currentValue"); MemstoreSize memStoreSize = new MemstoreSize(); @@ -1269,35 +1300,182 @@ public class TestStore { storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); } - private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook, + boolean switchToPread) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setMaxVersions(5); - return (MyStore) init(methodName, conf, htd, hcd, hook); + return (MyStore) init(methodName, conf, htd, hcd, hook, switchToPread); } private static class MyStore extends HStore { private final MyScannerHook hook; - MyStore(final HRegion region, final HColumnDescriptor family, - final Configuration confParam, MyScannerHook hook) throws IOException { + private final boolean switchToPread; + + MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam, + MyScannerHook hook, boolean switchToPread) throws IOException { super(region, family, confParam); this.hook = hook; + this.switchToPread = switchToPread; } @Override public List getScanners(List files, boolean cacheBlocks, - boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { - hook.hook(this); - return super.getScanners(files, cacheBlocks, usePread, - isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner); + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + if (!switchToPread) { + hook.hook(this); + } + return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, + stopRow, false, readPt, includeMemstoreScanner); + } + + @Override + public int getCompactedfilesCount() { + if (switchToPread) { + if (latch == null) { + // set the latch here + latch = new CountDownLatch(1); + } + } + return super.getCompactedfilesCount(); + } + + @Override + public Collection getStorefiles() { + try { + if (switchToPread && latch != null) { + TestStore.switchToPread = true; + latch.await(); + } + } catch (InterruptedException e) { + } + return super.getStorefiles(); } } private interface MyScannerHook { void hook(MyStore store) throws IOException; } + @Test + public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { + int flushSize = 500; + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); + conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); + // Set the lower threshold to invoke the "MERGE" policy + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() { + + @Override + public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store) + throws IOException { + + } + }, true); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = System.currentTimeMillis(); + long seqID = 1l; + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 11; i < 20; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 21; i < 30; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + assertEquals(3, store.getStorefilesCount()); + ScanInfo scanInfo = store.getScanInfo(); + Scan scan = new Scan(); + scan.addFamily(family); + Collection storefiles2 = store.getStorefiles(); + ArrayList actualStorefiles = Lists.newArrayList(storefiles2); + StoreScanner storeScanner = + (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); + // get the current heap + KeyValueHeap heap = storeScanner.heap; + // create more store files + for (int i = 31; i < 40; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + for (int i = 41; i < 50; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + storefiles2 = store.getStorefiles(); + ArrayList actualStorefiles1 = Lists.newArrayList(storefiles2); + actualStorefiles1.removeAll(actualStorefiles); + // Do compaction + List exceptions = new ArrayList(); + MyThread thread = new MyThread(storeScanner); + thread.start(); + store.getStoreEngine().getStoreFileManager().addCompactionResults(actualStorefiles, + actualStorefiles1); + thread.join(); + KeyValueHeap heap2 = thread.getHeap(); + assertFalse(heap.equals(heap2)); + } + + private static class MyThread extends Thread { + private StoreScanner scanner; + private KeyValueHeap heap; + + public MyThread(StoreScanner scanner) { + this.scanner = scanner; + } + + public KeyValueHeap getHeap() { + return this.heap; + } + + public void run() { + scanner.trySwitchToStreamRead(); + heap = scanner.heap; + } + } + public static class DummyStoreFileManager extends DefaultStoreFileManager { + public DummyStoreFileManager(CellComparator kvComparator, + Comparator storeFileComparator, Configuration conf, + CompactionConfiguration comConf) { + super(kvComparator, storeFileComparator, conf, comConf); + } + + @Override + protected void updateStoreFilesAfterCompaction(Collection newCompactedfiles, + Collection results) { + // TODO Auto-generated method stub + super.updateStoreFilesAfterCompaction(newCompactedfiles, results); + do { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + } + } while (!TestStore.switchToPread); + if(latch != null) { + latch.countDown(); + } + } + } private static class MyMemStoreCompactor extends MemStoreCompactor { private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);