Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java (revision 1519982) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java (working copy) @@ -106,6 +106,12 @@ /** Overhead for timerange */ public static final int TIMERANGE; + /** Overhead for TimeRangeTracker */ + public static final int TIMERANGE_TRACKER; + + /** Overhead for KeyValueSkipListSet */ + public static final int KEYVALUE_SKIPLIST_SET; + /* Are we running on jdk7? */ private static final boolean JDK7; static { @@ -184,6 +190,10 @@ COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY); TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN); + + TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); + + KEYVALUE_SKIPLIST_SET = align(OBJECT + REFERENCE); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (revision 1519982) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (working copy) @@ -21,14 +21,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.util.StringUtils; @@ -45,7 +43,7 @@ } @Override - public List flushSnapshot(SortedSet snapshot, long cacheFlushId, + public List flushSnapshot(List snapshot, long cacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) throws IOException { ArrayList result = new ArrayList(); @@ -53,8 +51,11 @@ // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - KeyValueScanner memstoreScanner = - new CollectionBackedScanner(snapshot, store.getComparator()); + List scanners = new ArrayList(snapshot.size()); + for (KeyValueSkipListSet snapshotItem : snapshot) { + scanners.add(new CollectionBackedScanner(snapshotItem, store.getComparator())); + } + KeyValueScanner memstoreScanner = new KeyValueHeap(scanners, store.getComparator()); InternalScanner scanner = preCreateCoprocScanner(memstoreScanner); if (scanner == null) { scanner = createStoreScanner(smallestReadPoint, memstoreScanner); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1519982) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -693,7 +693,7 @@ * @throws IOException */ protected List flushCache(final long logCacheFlushId, - SortedSet snapshot, + List snapshot, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) throws IOException { @@ -817,13 +817,13 @@ * @return Whether compaction is required. */ private boolean updateStorefiles( - final List sfs, final SortedSet set) throws IOException { + final List sfs, final List snapshot) throws IOException { this.lock.writeLock().lock(); try { for (StoreFile sf : sfs) { this.storeEngine.getStoreFileManager().insertNewFile(sf); } - this.memstore.clearSnapshot(set); + this.memstore.clearSnapshot(snapshot); } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling @@ -1794,7 +1794,7 @@ private class StoreFlusherImpl implements StoreFlushContext { private long cacheFlushSeqNum; - private SortedSet snapshot; + private List snapshot; private List tempFiles; private TimeRangeTracker snapshotTimeRangeTracker; private final AtomicLong flushedSize = new AtomicLong(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (revision 1519982) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (working copy) @@ -44,7 +44,7 @@ * get and set and won't throw ConcurrentModificationException when iterating. */ @InterfaceAudience.Private -class KeyValueSkipListSet implements NavigableSet { +public class KeyValueSkipListSet implements NavigableSet { private final ConcurrentNavigableMap delegatee; KeyValueSkipListSet(final KeyValue.KVComparator c) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1519982) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -64,6 +64,7 @@ static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; + private static final String MEMSTORE_SLICE_MAXSIZE_KEY = "hbase.hregion.memstore.slice.maxsize"; private static final boolean USEMSLAB_DEFAULT = true; private Configuration conf; @@ -73,10 +74,11 @@ // whereas the Set will not add new KV if key is same though value might be // different. Value is not important -- just make sure always same // reference passed. - volatile KeyValueSkipListSet kvset; + volatile KeyValueSkipListSet curKvset; // The current active kvset to which KVs are getting added. + volatile List kvsets; // Snapshot of memstore. Made for flusher. - volatile KeyValueSkipListSet snapshot; + volatile List snapshot; final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -84,6 +86,7 @@ // Used to track own heapSize final AtomicLong size; + final AtomicLong curKvsetSize; // Used to track when to flush volatile long timeOfOldestEdit = Long.MAX_VALUE; @@ -95,6 +98,8 @@ volatile MemStoreLAB allocator; volatile MemStoreLAB snapshotAllocator; + private final long sliceMaxSize; + /** * Default constructor. Used for tests. */ @@ -110,11 +115,14 @@ final KeyValue.KVComparator c) { this.conf = conf; this.comparator = c; - this.kvset = new KeyValueSkipListSet(c); - this.snapshot = new KeyValueSkipListSet(c); + this.curKvset = new KeyValueSkipListSet(c); + this.kvsets = new ArrayList(); + this.kvsets.add(curKvset); + this.snapshot = new ArrayList(0); timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + this.curKvsetSize = new AtomicLong(); if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { this.chunkPool = MemStoreChunkPool.getPool(conf); this.allocator = new MemStoreLAB(conf, chunkPool); @@ -122,20 +130,25 @@ this.allocator = null; this.chunkPool = null; } + this.sliceMaxSize = conf.getLong(MEMSTORE_SLICE_MAXSIZE_KEY, 0); } void dump() { - for (KeyValue kv: this.kvset) { - LOG.info(kv); + for (KeyValueSkipListSet kvsls : this.kvsets) { + for (KeyValue kv : kvsls) { + LOG.info(kv); + } } - for (KeyValue kv: this.snapshot) { - LOG.info(kv); + for (KeyValueSkipListSet kvsls : this.snapshot) { + for (KeyValue kv : kvsls) { + LOG.info(kv); + } } } /** * Creates a snapshot of the current memstore. - * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)} + * Snapshot must be cleared by call to {@link #clearSnapshot(List)} * To get the snapshot made by this method, use {@link #getSnapshot()} */ void snapshot() { @@ -147,13 +160,16 @@ LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { - if (!this.kvset.isEmpty()) { - this.snapshot = this.kvset; - this.kvset = new KeyValueSkipListSet(this.comparator); + if (this.kvsets.size() > 1 || !this.curKvset.isEmpty()) { + this.snapshot = this.kvsets; + this.curKvset = new KeyValueSkipListSet(this.comparator); + this.kvsets = new ArrayList(); + this.kvsets.add(curKvset); this.snapshotTimeRangeTracker = this.timeRangeTracker; this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); + this.curKvsetSize.set(0); this.snapshotAllocator = this.allocator; // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { @@ -173,9 +189,9 @@ * call to {@link #snapshot()} * @return Return snapshot. * @see {@link #snapshot()} - * @see {@link #clearSnapshot(SortedSet)} + * @see {@link #clearSnapshot(List)} */ - KeyValueSkipListSet getSnapshot() { + List getSnapshot() { return this.snapshot; } @@ -185,7 +201,7 @@ * @throws UnexpectedException * @see {@link #snapshot()} */ - void clearSnapshot(final SortedSet ss) + void clearSnapshot(final List ss) throws UnexpectedException { MemStoreLAB tmpAllocator = null; this.lock.writeLock().lock(); @@ -197,7 +213,7 @@ // OK. Passed in snapshot is same as current snapshot. If not-empty, // create a new snapshot and let the old one go. if (!ss.isEmpty()) { - this.snapshot = new KeyValueSkipListSet(this.comparator); + this.snapshot = new ArrayList(0); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } if (this.snapshotAllocator != null) { @@ -232,17 +248,11 @@ } private boolean addToKVSet(KeyValue e) { - boolean b = this.kvset.add(e); + boolean b = this.curKvset.add(e); setOldestEditTimeToNow(); return b; } - private boolean removeFromKVSet(KeyValue e) { - boolean b = this.kvset.remove(e); - setOldestEditTimeToNow(); - return b; - } - void setOldestEditTimeToNow() { if (timeOfOldestEdit == Long.MAX_VALUE) { timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis(); @@ -259,9 +269,33 @@ long s = heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); + checkAndCreateNewKvSet(s); return s; } + private void checkAndCreateNewKvSet(long s) { + long changedVal = this.curKvsetSize.addAndGet(s); + if (this.sliceMaxSize != 0 && this.sliceMaxSize <= changedVal) { + // Reached the max size a memstore sub region can have. We create a new CSLM to hold KVs + // Adding more KVs into same CSLM will slow down the puts. + this.lock.writeLock().lock(); + try { + if (this.sliceMaxSize <= this.curKvsetSize.get()) { + this.curKvset = new KeyValueSkipListSet(comparator); + this.kvsets.add(curKvset); + this.curKvsetSize.set(0); + // We created a new KVSLSet and added to kvsets List. Adding the heap size taken by this + // into the this.size + long newSliceOverhead = ClassSize.REFERENCE + ClassSize.KEYVALUE_SKIPLIST_SET + + ClassSize.CONCURRENT_SKIPLISTMAP; + this.size.addAndGet(newSliceOverhead); + } + } finally { + this.lock.writeLock().unlock(); + } + } + } + private KeyValue maybeCloneWithAllocator(KeyValue kv) { if (allocator == null) { return kv; @@ -297,16 +331,24 @@ // not the snapshot. The flush of this snapshot to disk has not // yet started because Store.flush() waits for all rwcc transactions to // commit before starting the flush to disk. - KeyValue found = this.snapshot.get(kv); - if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { - this.snapshot.remove(kv); + for (KeyValueSkipListSet kvset : snapshot) { + KeyValue found = kvset.get(kv); + if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { + kvset.remove(kv); + break; + } } // If the key is in the memstore, delete it. Update this.size. - found = this.kvset.get(kv); - if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { - removeFromKVSet(kv); - long s = heapSizeChange(kv, true); - this.size.addAndGet(-s); + for (KeyValueSkipListSet kvset : kvsets) { + KeyValue found = kvset.get(kv); + if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { + kvset.remove(kv); + setOldestEditTimeToNow(); + long s = heapSizeChange(kv, true); + this.size.addAndGet(-s); + this.curKvsetSize.addAndGet(-s); + break; + } } } finally { this.lock.readLock().unlock(); @@ -320,7 +362,7 @@ */ long delete(final KeyValue delete) { long s = 0; - this.lock.readLock().lock(); + this.lock.readLock().lock(); try { KeyValue toAdd = maybeCloneWithAllocator(delete); s += heapSizeChange(toAdd, addToKVSet(toAdd)); @@ -329,6 +371,7 @@ this.lock.readLock().unlock(); } this.size.addAndGet(s); + checkAndCreateNewKvSet(s); return s; } @@ -340,7 +383,7 @@ KeyValue getNextRow(final KeyValue kv) { this.lock.readLock().lock(); try { - return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot)); + return getLowest(getNextRow(kv, this.kvsets), getNextRow(kv, this.snapshot)); } finally { this.lock.readLock().unlock(); } @@ -363,22 +406,26 @@ /* * @param key Find row that follows this one. If null, return first. - * @param map Set to look in for a row beyond row. + * @param sets List of KVSet to look in for a row beyond row. * @return Next row or null if none found. If one found, will be a new * KeyValue -- can be destroyed by subsequent calls to this method. */ private KeyValue getNextRow(final KeyValue key, - final NavigableSet set) { + final List sets) { KeyValue result = null; - SortedSet tail = key == null? set: set.tailSet(key); - // Iterate until we fall into the next row; i.e. move off current row - for (KeyValue kv: tail) { - if (comparator.compareRows(kv, key) <= 0) - continue; - // Note: Not suppressing deletes or expired cells. Needs to be handled - // by higher up functions. - result = kv; - break; + for(KeyValueSkipListSet set:sets){ + SortedSet tail = key == null? set: set.tailSet(key); + // Iterate until we fall into the next row; i.e. move off current row + for (KeyValue kv: tail) { + if (comparator.compareRows(kv, key) <= 0) + continue; + // Note: Not suppressing deletes or expired cells. Needs to be handled + // by higher up functions. + if(result == null || comparator.compareRows(kv, result) <= 0) { + result = kv; + } + break; + } } return result; } @@ -389,7 +436,7 @@ void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { this.lock.readLock().lock(); try { - getRowKeyAtOrBefore(kvset, state); + getRowKeyAtOrBefore(kvsets, state); getRowKeyAtOrBefore(snapshot, state); } finally { this.lock.readLock().unlock(); @@ -397,17 +444,23 @@ } /* - * @param set + * @param sets * @param state Accumulates deletes and candidates. */ - private void getRowKeyAtOrBefore(final NavigableSet set, + private void getRowKeyAtOrBefore(final List sets, final GetClosestRowBeforeTracker state) { - if (set.isEmpty()) { + if (sets.isEmpty()) { return; } - if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) { + boolean result = false; + for(KeyValueSkipListSet set : sets){ + result = result || walkForwardInSingleRow(set, state.getTargetKey(), state); + } + if (!result) { // Found nothing in row. Try backing up. - getRowKeyBefore(set, state); + for (KeyValueSkipListSet set : sets) { + getRowKeyBefore(set, state); + } } } @@ -492,14 +545,17 @@ KeyValue firstKv = KeyValue.createFirstOnRow( row, family, qualifier); // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstKv); - if (!snSs.isEmpty()) { - KeyValue snKv = snSs.first(); - // is there a matching KV in the snapshot? - if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) { - if (snKv.getTimestamp() == now) { - // poop, - now += 1; + for (KeyValueSkipListSet kvset : snapshot) { + SortedSet snSs = kvset.tailSet(firstKv); + if (!snSs.isEmpty()) { + KeyValue snKv = snSs.first(); + // is there a matching KV in the snapshot? + if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) { + if (snKv.getTimestamp() == now) { + // poop, + now += 1; + break; + } } } } @@ -510,21 +566,21 @@ // so we cant add the new KV w/o knowing what's there already, but we also // want to take this chance to delete some kvs. So two loops (sad) - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); - while ( it.hasNext() ) { - KeyValue kv = it.next(); - - // if this isnt the row we are interested in, then bail: - if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) { - break; // rows dont match, bail. + for (KeyValueSkipListSet kvset : kvsets) { + SortedSet ss = kvset.tailSet(firstKv); + Iterator it = ss.iterator(); + while ( it.hasNext() ) { + KeyValue kv = it.next(); + // if this isnt the row we are interested in, then bail: + if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) { + break; // rows dont match, bail. + } + // if the qualifier matches and it's a put, just RM it out of the kvset. + if (kv.getType() == KeyValue.Type.Put.getCode() && + kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) { + now = kv.getTimestamp(); + } } - - // if the qualifier matches and it's a put, just RM it out of the kvset. - if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) { - now = kv.getTimestamp(); - } } // create or update (upsert) a new KeyValue with @@ -598,38 +654,40 @@ kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); // versions visible to oldest scanner int versionsVisible = 0; - while ( it.hasNext() ) { - KeyValue cur = it.next(); + for (KeyValueSkipListSet kvset : this.kvsets) { + SortedSet ss = kvset.tailSet(firstKv); + Iterator it = ss.iterator(); + while (it.hasNext()) { + KeyValue cur = it.next(); + if (kv == cur) { + // ignore the one just put in + continue; + } + // check that this is the row and column we are interested in, otherwise bail + if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) { + // only remove Puts that concurrent scanners cannot possibly see + if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMvccVersion() <= readpoint) { + if (versionsVisible > 1) { + // if we get here we have seen at least one version visible to the oldest scanner, + // which means we can prove that no scanner will see this version - if (kv == cur) { - // ignore the one just put in - continue; - } - // check that this is the row and column we are interested in, otherwise bail - if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) { - // only remove Puts that concurrent scanners cannot possibly see - if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMvccVersion() <= readpoint) { - if (versionsVisible > 1) { - // if we get here we have seen at least one version visible to the oldest scanner, - // which means we can prove that no scanner will see this version - - // false means there was a change, so give us the size. - long delta = heapSizeChange(cur, true); - addedSize -= delta; - this.size.addAndGet(-delta); - it.remove(); - setOldestEditTimeToNow(); - } else { - versionsVisible++; + // false means there was a change, so give us the size. + long delta = heapSizeChange(cur, true); + addedSize -= delta; + this.size.addAndGet(-delta); + this.curKvsetSize.addAndGet(-delta); + it.remove(); + setOldestEditTimeToNow(); + } else { + versionsVisible++; + } } + } else { + // past the row or column, done + break; } - } else { - // past the row or column, done - break; } } return addedSize; @@ -708,29 +766,12 @@ * This behaves as if it were a real scanner but does not maintain position. */ protected class MemStoreScanner extends NonLazyKeyValueScanner { - // Next row information for either kvset or snapshot - private KeyValue kvsetNextRow = null; - private KeyValue snapshotNextRow = null; - - // last iterated KVs for kvset and snapshot (to restore iterator state after reseek) - private KeyValue kvsetItRow = null; - private KeyValue snapshotItRow = null; - - // iterator based scanning. - private Iterator kvsetIt; - private Iterator snapshotIt; - - // The kvset and snapshot at the time of creating this scanner - private KeyValueSkipListSet kvsetAtCreation; - private KeyValueSkipListSet snapshotAtCreation; - - // the pre-calculated KeyValue to be returned by peek() or next() - private KeyValue theNext; - // The allocator and snapshot allocator at the time of creating this scanner volatile MemStoreLAB allocatorAtCreation; volatile MemStoreLAB snapshotAllocatorAtCreation; + private List scanners; + /* Some notes... @@ -754,9 +795,6 @@ MemStoreScanner() { super(); - - kvsetAtCreation = kvset; - snapshotAtCreation = snapshot; if (allocator != null) { this.allocatorAtCreation = allocator; this.allocatorAtCreation.incScannerCount(); @@ -765,31 +803,13 @@ this.snapshotAllocatorAtCreation = snapshotAllocator; this.snapshotAllocatorAtCreation.incScannerCount(); } - } - - private KeyValue getNext(Iterator it) { - long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); - - KeyValue v = null; - try { - while (it.hasNext()) { - v = it.next(); - if (v.getMvccVersion() <= readPoint) { - return v; - } - } - - return null; - } finally { - if (v != null) { - // in all cases, remember the last KV iterated to - if (it == snapshotIt) { - snapshotItRow = v; - } else { - kvsetItRow = v; - } - } + this.scanners = new ArrayList(kvsets.size() + snapshot.size()); + for (KeyValueSkipListSet kvset : kvsets) { + scanners.add(new MemStoreSliceScanner(kvset)); } + for (KeyValueSkipListSet kvset : snapshot) { + scanners.add(new MemStoreSliceScanner(kvset)); + } } /** @@ -806,33 +826,15 @@ return false; } - // kvset and snapshot will never be null. - // if tailSet can't find anything, SortedSet is empty (not null). - kvsetIt = kvsetAtCreation.tailSet(key).iterator(); - snapshotIt = snapshotAtCreation.tailSet(key).iterator(); - kvsetItRow = null; - snapshotItRow = null; - - return seekInSubLists(key); + boolean finalResult = false; + for (MemStoreSliceScanner kvs : this.scanners) { + boolean result = kvs.seek(key); + finalResult = finalResult || result; + } + return finalResult; } - /** - * (Re)initialize the iterators after a seek or a reseek. - */ - private synchronized boolean seekInSubLists(KeyValue key){ - kvsetNextRow = getNext(kvsetIt); - snapshotNextRow = getNext(snapshotIt); - - // Calculate the next value - theNext = getLowest(kvsetNextRow, snapshotNextRow); - - // has data - return (theNext != null); - } - - - /** * Move forward on the sub-lists set previously by seek. * @param key seek value (should be non-null) * @return true if there is at least one KV to read, false otherwise @@ -852,41 +854,39 @@ the reseeked set to at least that point. */ - kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator(); - snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); - - return seekInSubLists(key); + boolean finalResult = false; + for (MemStoreSliceScanner kvs : this.scanners) { + boolean result = kvs.reseek(key); + finalResult = finalResult || result; + } + return finalResult; } @Override public synchronized KeyValue peek() { - //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); - return theNext; + KeyValue result = null; + for (MemStoreSliceScanner kvs : this.scanners) { + result = getLowest(result, kvs.peek()); + } + return result; } @Override public synchronized KeyValue next() { - if (theNext == null) { - return null; + KeyValue result = null; + MemStoreSliceScanner lowestScanner = null; + for (MemStoreSliceScanner kvs : this.scanners) { + KeyValue kv = kvs.peek(); + if (result == null || (kv != null && comparator.compare(result, kv) > 0)) { + result = kv; + lowestScanner = kvs; + } } - - final KeyValue ret = theNext; - - // Advance one of the iterators - if (theNext == kvsetNextRow) { - kvsetNextRow = getNext(kvsetIt); - } else { - snapshotNextRow = getNext(snapshotIt); + if (lowestScanner != null) { + lowestScanner.next(); } - - // Calculate the next value - theNext = getLowest(kvsetNextRow, snapshotNextRow); - - //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + - // getLowest() + " threadpoint=" + readpoint); - return ret; + return result; } /* @@ -905,11 +905,108 @@ return (first != null ? first : second); } - /* - * Returns the higher of the two key values, or null if they are both null. - * This uses comparator.compare() to compare the KeyValue using the memstore - * comparator. + public synchronized void close() { + for (MemStoreSliceScanner kvs : this.scanners) { + kvs.close(); + } + } + + /** + * MemStoreScanner returns max value as sequence id because it will + * always have the latest data among all files. */ + @Override + public long getSequenceID() { + return Long.MAX_VALUE; + } + + @Override + public boolean shouldUseScanner(Scan scan, SortedSet columns, + long oldestUnexpiredTS) { + return shouldSeek(scan, oldestUnexpiredTS); + } + } + + // Scanner which can scan a memstore slice. ie. one KeyValueSkipListSet instance, out of many, + // which holds the KVs. + private class MemStoreSliceScanner extends NonLazyKeyValueScanner { + // last iterated KVs (to restore iterator state after reseek) + private KeyValue kvsetItRow = null; + // iterator based scanning. + private Iterator kvsetIt; + // The kvset and snapshot at the time of creating this scanner + private KeyValueSkipListSet kvset; + // the pre-calculated KeyValue to be returned by peek() or next() + private KeyValue theNext; + + MemStoreSliceScanner(KeyValueSkipListSet kvset) { + super(); + this.kvset = kvset; + } + + @Override + public KeyValue peek() { + return this.theNext; + } + + @Override + public KeyValue next() { + if (theNext == null) { + return null; + } + final KeyValue ret = theNext; + // Advance iterator + theNext = getNext(); + return ret; + } + + @Override + public synchronized boolean seek(KeyValue key) { + if (key == null) { + close(); + return false; + } + // if tailSet can't find anything, SortedSet is empty (not null). + kvsetIt = kvset.tailSet(key).iterator(); + kvsetItRow = null; + return seekInSubLists(key); + } + + /** + * (Re)initialize the iterators after a seek or a reseek. + */ + private synchronized boolean seekInSubLists(KeyValue key){ + // Calculate the next value + theNext = getNext(); + // has data + return (theNext != null); + } + + private KeyValue getNext() { + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); + KeyValue v = null; + try { + while (kvsetIt.hasNext()) { + v = kvsetIt.next(); + if (v.getMvccVersion() <= readPoint) { + return v; + } + } + return null; + } finally { + if (v != null) { + // in all cases, remember the last KV iterated to + kvsetItRow = v; + } + } + } + + @Override + public boolean reseek(KeyValue key) { + kvsetIt = kvset.tailSet(getHighest(key, kvsetItRow)).iterator(); + return seekInSubLists(key); + } + private KeyValue getHighest(KeyValue first, KeyValue second) { if (first == null && second == null) { return null; @@ -921,49 +1018,27 @@ return (first != null ? first : second); } - public synchronized void close() { - this.kvsetNextRow = null; - this.snapshotNextRow = null; - - this.kvsetIt = null; - this.snapshotIt = null; - - if (allocatorAtCreation != null) { - this.allocatorAtCreation.decScannerCount(); - this.allocatorAtCreation = null; - } - if (snapshotAllocatorAtCreation != null) { - this.snapshotAllocatorAtCreation.decScannerCount(); - this.snapshotAllocatorAtCreation = null; - } - - this.kvsetItRow = null; - this.snapshotItRow = null; - } - - /** - * MemStoreScanner returns max value as sequence id because it will - * always have the latest data among all files. - */ @Override public long getSequenceID() { return Long.MAX_VALUE; } @Override - public boolean shouldUseScanner(Scan scan, SortedSet columns, - long oldestUnexpiredTS) { - return shouldSeek(scan, oldestUnexpiredTS); + public void close() { + this.kvsetIt = null; + this.kvsetItRow = null; } } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + ClassSize.OBJECT + (13 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + - ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + - (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); + ClassSize.REENTRANT_LOCK + (2 * ClassSize.ATOMIC_LONG) + + (2 * ClassSize.TIMERANGE_TRACKER) + + ClassSize.ARRAYLIST + ClassSize.REFERENCE + + ClassSize.KEYVALUE_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP + + ClassSize.ARRAYLIST); /** Used for readability when we don't store memstore timestamp in HFile */ public static final boolean NO_PERSISTENT_TS = false; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (revision 1519982) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (working copy) @@ -22,11 +22,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -62,7 +59,7 @@ * @param status Task that represents the flush operation and may be updated with status. * @return List of files written. Can be empty; must not be null. */ - public abstract List flushSnapshot(SortedSet snapshot, long cacheFlushSeqNum, + public abstract List flushSnapshot(List snapshot, long cacheFlushSeqNum, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) throws IOException; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (revision 1519982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (working copy) @@ -47,7 +47,10 @@ import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet; import org.apache.hadoop.hbase.regionserver.MemStore; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.ClassSize; import org.junit.BeforeClass; import org.junit.Test; @@ -225,7 +228,23 @@ assertEquals(expected, actual); } + // TimeRangeTracker + cl = TimeRangeTracker.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.TIMERANGE_TRACKER; + if (expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + // KeyValueSkipListSet + cl = KeyValueSkipListSet.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.KEYVALUE_SKIPLIST_SET; + if (expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } } /** @@ -287,18 +306,22 @@ actual = MemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false); - expected += ClassSize.estimateBase(AtomicLong.class, false); + expected += (2 * ClassSize.estimateBase(AtomicLong.class, false)); + expected += ClassSize.estimateBase(KeyValueSkipListSet.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); - expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); - expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false); - expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false); + expected += (2 * ClassSize.estimateBase(ArrayList.class, false)); + expected += ClassSize.REFERENCE; + expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false)); if(expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(ReentrantReadWriteLock.class, true); ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(KeyValueSkipListSet.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); - ClassSize.estimateBase(CopyOnWriteArraySet.class, true); - ClassSize.estimateBase(CopyOnWriteArrayList.class, true); + ClassSize.estimateBase(ArrayList.class, true); + ClassSize.estimateBase(TimeRangeTracker.class, true); + ClassSize.estimateBase(TimeRangeTracker.class, true); assertEquals(expected, actual); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1519982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -1057,12 +1057,12 @@ //checkAndPut with wrong value HStore store = (HStore) region.getStore(fam1); - store.memstore.kvset.size(); + store.memstore.curKvset.size(); boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1), put, true); assertEquals(true, res); - store.memstore.kvset.size(); + store.memstore.curKvset.size(); Get get = new Get(row1); get.addColumn(fam2, qf1); @@ -1577,10 +1577,10 @@ // extract the key values out the memstore: // This is kinda hacky, but better than nothing... long now = System.currentTimeMillis(); - KeyValue firstKv = ((HStore) region.getStore(fam1)).memstore.kvset.first(); + KeyValue firstKv = ((HStore) region.getStore(fam1)).memstore.curKvset.first(); assertTrue(firstKv.getTimestamp() <= now); now = firstKv.getTimestamp(); - for (KeyValue kv : ((HStore) region.getStore(fam1)).memstore.kvset) { + for (KeyValue kv : ((HStore) region.getStore(fam1)).memstore.curKvset) { assertTrue(kv.getTimestamp() <= now); now = kv.getTimestamp(); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1519982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -60,9 +60,6 @@ private static final int ROW_COUNT = 10; private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); - private static final byte [] CONTENTS = Bytes.toBytes("contents"); - private static final byte [] BASIC = Bytes.toBytes("basic"); - private static final String CONTENTSTR = "contentstr"; private MultiVersionConsistencyControl mvcc; @Override @@ -79,8 +76,8 @@ byte [] other = Bytes.toBytes("somethingelse"); KeyValue samekey = new KeyValue(bytes, bytes, bytes, other); this.memstore.add(samekey); - KeyValue found = this.memstore.kvset.first(); - assertEquals(1, this.memstore.kvset.size()); + KeyValue found = this.memstore.curKvset.first(); + assertEquals(1, this.memstore.curKvset.size()); assertTrue(Bytes.toString(found.getValue()), CellUtil.matchingValue(samekey, found)); } @@ -474,7 +471,7 @@ for (int i = 0; i < snapshotCount; i++) { addRows(this.memstore); runSnapshot(this.memstore); - KeyValueSkipListSet ss = this.memstore.getSnapshot(); + List ss = this.memstore.getSnapshot(); assertEquals("History not being cleared", 0, ss.size()); } } @@ -496,7 +493,7 @@ m.add(key2); assertTrue("Expected memstore to hold 3 values, actually has " + - m.kvset.size(), m.kvset.size() == 3); + m.curKvset.size(), m.curKvset.size() == 3); } ////////////////////////////////////////////////////////////////////////////// @@ -568,12 +565,13 @@ memstore.add(new KeyValue(row, fam ,qf3, val)); //Creating a snapshot memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(1, memstore.snapshot.size()); + assertEquals(3, memstore.snapshot.get(0).size()); //Adding value to "new" memstore - assertEquals(0, memstore.kvset.size()); + assertEquals(0, memstore.curKvset.size()); memstore.add(new KeyValue(row, fam ,qf4, val)); memstore.add(new KeyValue(row, fam ,qf5, val)); - assertEquals(2, memstore.kvset.size()); + assertEquals(2, memstore.curKvset.size()); } ////////////////////////////////////////////////////////////////////////////// @@ -595,7 +593,7 @@ memstore.add(put2); memstore.add(put3); - assertEquals(3, memstore.kvset.size()); + assertEquals(3, memstore.curKvset.size()); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val); memstore.delete(del2); @@ -606,9 +604,9 @@ expected.add(put2); expected.add(put1); - assertEquals(4, memstore.kvset.size()); + assertEquals(4, memstore.curKvset.size()); int i = 0; - for(KeyValue kv : memstore.kvset) { + for(KeyValue kv : memstore.curKvset) { assertEquals(expected.get(i++), kv); } } @@ -629,7 +627,7 @@ memstore.add(put2); memstore.add(put3); - assertEquals(3, memstore.kvset.size()); + assertEquals(3, memstore.curKvset.size()); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val); @@ -642,9 +640,9 @@ expected.add(put1); - assertEquals(4, memstore.kvset.size()); + assertEquals(4, memstore.curKvset.size()); int i = 0; - for (KeyValue kv: memstore.kvset) { + for (KeyValue kv: memstore.curKvset) { assertEquals(expected.get(i++), kv); } } @@ -682,9 +680,9 @@ - assertEquals(5, memstore.kvset.size()); + assertEquals(5, memstore.curKvset.size()); int i = 0; - for (KeyValue kv: memstore.kvset) { + for (KeyValue kv: memstore.curKvset) { assertEquals(expected.get(i++), kv); } } @@ -698,8 +696,8 @@ memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(2, memstore.kvset.size()); - assertEquals(delete, memstore.kvset.first()); + assertEquals(2, memstore.curKvset.size()); + assertEquals(delete, memstore.curKvset.first()); } public void testRetainsDeleteVersion() throws IOException { @@ -711,8 +709,8 @@ "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.kvset.size()); - assertEquals(delete, memstore.kvset.first()); + assertEquals(2, memstore.curKvset.size()); + assertEquals(delete, memstore.curKvset.first()); } public void testRetainsDeleteColumn() throws IOException { // add a put to memstore @@ -723,8 +721,8 @@ KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.kvset.size()); - assertEquals(delete, memstore.kvset.first()); + assertEquals(2, memstore.curKvset.size()); + assertEquals(delete, memstore.curKvset.first()); } public void testRetainsDeleteFamily() throws IOException { // add a put to memstore @@ -735,8 +733,8 @@ KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.kvset.size()); - assertEquals(delete, memstore.kvset.first()); + assertEquals(2, memstore.curKvset.size()); + assertEquals(delete, memstore.curKvset.first()); } //////////////////////////////////// @@ -980,11 +978,19 @@ private long runSnapshot(final MemStore hmc) throws UnexpectedException { // Save off old state. - int oldHistorySize = hmc.getSnapshot().size(); + List snapshot = hmc.getSnapshot(); + int oldHistorySize = 0; + for (KeyValueSkipListSet snapshotItem : snapshot) { + oldHistorySize += snapshotItem.size(); + } hmc.snapshot(); - KeyValueSkipListSet ss = hmc.getSnapshot(); + List ss = hmc.getSnapshot(); + int newHistorySize = 0; + for (KeyValueSkipListSet snapshotItem : ss) { + newHistorySize += snapshotItem.size(); + } // Make some assertions about what just happened. - assertTrue("History size has not increased", oldHistorySize < ss.size()); + assertTrue("History size has not increased", oldHistorySize < newHistorySize); long t = memstore.timeOfOldestEdit(); assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); hmc.clearSnapshot(ss); @@ -1005,15 +1011,6 @@ } } - private KeyValue getDeleteKV(byte [] row) { - return new KeyValue(row, Bytes.toBytes("test_col"), null, - HConstants.LATEST_TIMESTAMP, KeyValue.Type.Delete, null); - } - - private KeyValue getKV(byte [] row, byte [] value) { - return new KeyValue(row, Bytes.toBytes("test_col"), null, - HConstants.LATEST_TIMESTAMP, value); - } private static void addRows(int count, final MemStore mem) { long nanos = System.nanoTime(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java (revision 1519982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java (working copy) @@ -116,14 +116,15 @@ // Creating a snapshot memstore.snapshot(); - KeyValueSkipListSet snapshot = memstore.getSnapshot(); - assertEquals(3, memstore.snapshot.size()); + List snapshot = memstore.getSnapshot(); + assertEquals(1, snapshot.size()); + assertEquals(3, snapshot.get(0).size()); // Adding value to "new" memstore - assertEquals(0, memstore.kvset.size()); + assertEquals(0, memstore.curKvset.size()); memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.kvset.size()); + assertEquals(2, memstore.curKvset.size()); memstore.clearSnapshot(snapshot); int chunkCount = chunkPool.getPoolSize(); @@ -154,14 +155,15 @@ // Creating a snapshot memstore.snapshot(); - KeyValueSkipListSet snapshot = memstore.getSnapshot(); - assertEquals(3, memstore.snapshot.size()); + List snapshot = memstore.getSnapshot(); + assertEquals(1, snapshot.size()); + assertEquals(3, snapshot.get(0).size()); // Adding value to "new" memstore - assertEquals(0, memstore.kvset.size()); + assertEquals(0, memstore.curKvset.size()); memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.kvset.size()); + assertEquals(2, memstore.curKvset.size()); // opening scanner before clear the snapshot List scanners = memstore.getScanners(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1519982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -436,7 +436,7 @@ this.store.snapshot(); flushStore(store, id++); assertEquals(storeFilessize, this.store.getStorefiles().size()); - assertEquals(0, this.store.memstore.kvset.size()); + assertEquals(0, this.store.memstore.curKvset.size()); } private void assertCheck() { @@ -480,7 +480,7 @@ flushStore(store, id++); assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one - assertEquals(2, this.store.memstore.kvset.size()); + assertEquals(2, this.store.memstore.curKvset.size()); // how many key/values for this row are there? Get get = new Get(row); @@ -554,7 +554,7 @@ } long computedSize=0; - for (KeyValue kv : this.store.memstore.kvset) { + for (KeyValue kv : this.store.memstore.curKvset) { long kvsize = this.store.memstore.heapSizeChange(kv, true); //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); computedSize += kvsize; @@ -585,7 +585,7 @@ // then flush. flushStore(store, id++); assertEquals(1, this.store.getStorefiles().size()); - assertEquals(1, this.store.memstore.kvset.size()); + assertEquals(1, this.store.memstore.curKvset.size()); // now increment again: newValue += 1; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1519982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; @@ -563,7 +564,7 @@ super(conf, store); } @Override - public List flushSnapshot(SortedSet snapshot, long cacheFlushId, + public List flushSnapshot(List snapshot, long cacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) throws IOException { if (throwExceptionWhenFlushing.get()) {