diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ee3e847..f097296 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -146,6 +146,53 @@ public interface MetricsRegionServerSource extends BaseSource { */ void updateFlushTime(long t); + /** + * Update the flush memstore size histogram + * @param bytes the number of bytes in the memstore + */ + void updateFlushMemstoreSize(long bytes); + + /** + * Update the flush output file size histogram + * @param bytes the number of bytes in the output file + */ + void updateFlushOutputSize(long bytes); + + /** + * Update the compaction time histogram, both major and minor + * @param isMajor whether compaction is a major compaction + * @param t time it took, in milliseconds + */ + void updateCompactionTime(boolean isMajor, long t); + + /** + * Update the compaction input number of files histogram + * @param isMajor whether compaction is a major compaction + * @param c number of files + */ + void updateCompactionInputFileCount(boolean isMajor, long c); + + /** + * Update the compaction total input file size histogram + * @param isMajor whether compaction is a major compaction + * @param bytes the number of bytes of the compaction input file + */ + void updateCompactionInputSize(boolean isMajor, long bytes); + + /** + * Update the compaction output number of files histogram + * @param isMajor whether compaction is a major compaction + * @param c number of files + */ + void updateCompactionOutputFileCount(boolean isMajor, long c); + + /** + * Update the compaction total output file size + * @param isMajor whether compaction is a major compaction + * @param bytes the number of bytes of the compaction input file + */ + void updateCompactionOutputSize(boolean isMajor, long bytes); + // Strings used for exporting to metrics system. String REGION_COUNT = "regionCount"; String REGION_COUNT_DESC = "Number of regions"; @@ -212,6 +259,10 @@ public interface MetricsRegionServerSource extends BaseSource { String LARGE_COMPACTION_QUEUE_LENGTH = "largeCompactionQueueLength"; String SMALL_COMPACTION_QUEUE_LENGTH = "smallCompactionQueueLength"; String COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions."; + String LARGE_COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions with input size " + + "larger than throttle threshold (2.5GB by default)"; + String SMALL_COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions with input size " + + "smaller than throttle threshold (2.5GB by default)"; String FLUSH_QUEUE_LENGTH = "flushQueueLength"; String FLUSH_QUEUE_LENGTH_DESC = "Length of the queue for region flushes"; String BLOCK_CACHE_FREE_SIZE = "blockCacheFreeSize"; @@ -345,7 +396,61 @@ public interface MetricsRegionServerSource extends BaseSource { String SPLIT_REQUEST_DESC = "Number of splits requested"; String SPLIT_SUCCESS_KEY = "splitSuccessCount"; String SPLIT_SUCCESS_DESC = "Number of successfully executed splits"; - String FLUSH_KEY = "flushTime"; + + String FLUSH_TIME = "flushTime"; + String FLUSH_TIME_DESC = "Histogram for the time in millis for memstore flush"; + String FLUSH_MEMSTORE_SIZE = "flushMemstoreSize"; + String FLUSH_MEMSTORE_SIZE_DESC = "Histogram for number of bytes in the memstore for a flush"; + String FLUSH_OUTPUT_SIZE = "flushOutputSize"; + String FLUSH_OUTPUT_SIZE_DESC = "Histogram for number of bytes in the resulting file for a flush"; + String FLUSHED_OUTPUT_BYTES = "flushedOutputBytes"; + String FLUSHED_OUTPUT_BYTES_DESC = "Total number of bytes written from flush"; + String FLUSHED_MEMSTORE_BYTES = "flushedMemstoreBytes"; + String FLUSHED_MEMSTORE_BYTES_DESC = "Total number of bytes of cells in memstore from flush"; + + String COMPACTION_TIME = "compactionTime"; + String COMPACTION_TIME_DESC + = "Histogram for the time in millis for compaction, both major and minor"; + String COMPACTION_INPUT_FILE_COUNT = "compactionInputFileCount"; + String COMPACTION_INPUT_FILE_COUNT_DESC + = "Histogram for the compaction input number of files, both major and minor"; + String COMPACTION_INPUT_SIZE = "compactionInputSize"; + String COMPACTION_INPUT_SIZE_DESC + = "Histogram for the compaction total input file sizes, both major and minor"; + String COMPACTION_OUTPUT_FILE_COUNT = "compactionOutputFileCount"; + String COMPACTION_OUTPUT_FILE_COUNT_DESC + = "Histogram for the compaction output number of files, both major and minor"; + String COMPACTION_OUTPUT_SIZE = "compactionOutputSize"; + String COMPACTION_OUTPUT_SIZE_DESC + = "Histogram for the compaction total output file sizes, both major and minor"; + String COMPACTED_INPUT_BYTES = "compactedInputBytes"; + String COMPACTED_INPUT_BYTES_DESC + = "Total number of bytes that is read for compaction, both major and minor"; + String COMPACTED_OUTPUT_BYTES = "compactedOutputBytes"; + String COMPACTED_OUTPUT_BYTES_DESC + = "Total number of bytes that is output from compaction, both major and minor"; + + String MAJOR_COMPACTION_TIME = "majorCompactionTime"; + String MAJOR_COMPACTION_TIME_DESC + = "Histogram for the time in millis for compaction, major only"; + String MAJOR_COMPACTION_INPUT_FILE_COUNT = "majorCompactionInputFileCount"; + String MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC + = "Histogram for the compaction input number of files, major only"; + String MAJOR_COMPACTION_INPUT_SIZE = "majorCompactionInputSize"; + String MAJOR_COMPACTION_INPUT_SIZE_DESC + = "Histogram for the compaction total input file sizes, major only"; + String MAJOR_COMPACTION_OUTPUT_FILE_COUNT = "majorCompactionOutputFileCount"; + String MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC + = "Histogram for the compaction output number of files, major only"; + String MAJOR_COMPACTION_OUTPUT_SIZE = "majorCompactionOutputSize"; + String MAJOR_COMPACTION_OUTPUT_SIZE_DESC + = "Histogram for the compaction total output file sizes, major only"; + String MAJOR_COMPACTED_INPUT_BYTES = "majorCompactedInputBytes"; + String MAJOR_COMPACTED_INPUT_BYTES_DESC + = "Total number of bytes that is read for compaction, major only"; + String MAJOR_COMPACTED_OUTPUT_BYTES = "majorCompactedOutputBytes"; + String MAJOR_COMPACTED_OUTPUT_BYTES_DESC + = "Total number of bytes that is output from compaction, major only"; String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount"; String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered."; diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index f869397..0c24cb4 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.metrics2.lib.MutableFastCounter; public class MetricsRegionServerSourceImpl extends BaseSourceImpl implements MetricsRegionServerSource { - final MetricsRegionServerWrapper rsWrap; private final MetricHistogram putHisto; private final MetricHistogram deleteHisto; @@ -55,7 +54,30 @@ public class MetricsRegionServerSourceImpl private final MutableFastCounter splitSuccess; private final MetricHistogram splitTimeHisto; + + // flush related metrics private final MetricHistogram flushTimeHisto; + private final MetricHistogram flushMemstoreSizeHisto; + private final MetricHistogram flushOutputSizeHisto; + private final MutableFastCounter flushedMemstoreBytes; + private final MutableFastCounter flushedOutputBytes; + + // compaction related metrics + private final MetricHistogram compactionTimeHisto; + private final MetricHistogram compactionInputFileCountHisto; + private final MetricHistogram compactionInputSizeHisto; + private final MetricHistogram compactionOutputFileCountHisto; + private final MetricHistogram compactionOutputSizeHisto; + private final MutableFastCounter compactedInputBytes; + private final MutableFastCounter compactedOutputBytes; + + private final MetricHistogram majorCompactionTimeHisto; + private final MetricHistogram majorCompactionInputFileCountHisto; + private final MetricHistogram majorCompactionInputSizeHisto; + private final MetricHistogram majorCompactionOutputFileCountHisto; + private final MetricHistogram majorCompactionOutputSizeHisto; + private final MutableFastCounter majorCompactedInputBytes; + private final MutableFastCounter majorCompactedOutputBytes; public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap); @@ -83,14 +105,52 @@ public class MetricsRegionServerSourceImpl appendHisto = getMetricsRegistry().newTimeHistogram(APPEND_KEY); slowAppend = getMetricsRegistry().newCounter(SLOW_APPEND_KEY, SLOW_APPEND_DESC, 0L); - + replayHisto = getMetricsRegistry().newTimeHistogram(REPLAY_KEY); scanSizeHisto = getMetricsRegistry().newSizeHistogram(SCAN_SIZE_KEY); scanTimeHisto = getMetricsRegistry().newTimeHistogram(SCAN_TIME_KEY); - splitTimeHisto = getMetricsRegistry().newTimeHistogram(SPLIT_KEY); - flushTimeHisto = getMetricsRegistry().newTimeHistogram(FLUSH_KEY); + flushTimeHisto = getMetricsRegistry().newTimeHistogram(FLUSH_TIME, FLUSH_TIME_DESC); + flushMemstoreSizeHisto = getMetricsRegistry() + .newSizeHistogram(FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC); + flushOutputSizeHisto = getMetricsRegistry().newSizeHistogram(FLUSH_OUTPUT_SIZE, + FLUSH_OUTPUT_SIZE_DESC); + flushedOutputBytes = getMetricsRegistry().newCounter(FLUSHED_OUTPUT_BYTES, + FLUSHED_OUTPUT_BYTES_DESC, 0L); + flushedMemstoreBytes = getMetricsRegistry().newCounter(FLUSHED_MEMSTORE_BYTES, + FLUSHED_MEMSTORE_BYTES_DESC, 0L); + + compactionTimeHisto = getMetricsRegistry() + .newTimeHistogram(COMPACTION_TIME, COMPACTION_TIME_DESC); + compactionInputFileCountHisto = getMetricsRegistry() + .newHistogram(COMPACTION_INPUT_FILE_COUNT, COMPACTION_INPUT_FILE_COUNT_DESC); + compactionInputSizeHisto = getMetricsRegistry() + .newSizeHistogram(COMPACTION_INPUT_SIZE, COMPACTION_INPUT_SIZE_DESC); + compactionOutputFileCountHisto = getMetricsRegistry() + .newHistogram(COMPACTION_OUTPUT_FILE_COUNT, COMPACTION_OUTPUT_FILE_COUNT_DESC); + compactionOutputSizeHisto = getMetricsRegistry() + .newSizeHistogram(COMPACTION_OUTPUT_SIZE, COMPACTION_OUTPUT_SIZE_DESC); + compactedInputBytes = getMetricsRegistry() + .newCounter(COMPACTED_INPUT_BYTES, COMPACTED_INPUT_BYTES_DESC, 0L); + compactedOutputBytes = getMetricsRegistry() + .newCounter(COMPACTED_OUTPUT_BYTES, COMPACTED_OUTPUT_BYTES_DESC, 0L); + + majorCompactionTimeHisto = getMetricsRegistry() + .newTimeHistogram(MAJOR_COMPACTION_TIME, MAJOR_COMPACTION_TIME_DESC); + majorCompactionInputFileCountHisto = getMetricsRegistry() + .newHistogram(MAJOR_COMPACTION_INPUT_FILE_COUNT, MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC); + majorCompactionInputSizeHisto = getMetricsRegistry() + .newSizeHistogram(MAJOR_COMPACTION_INPUT_SIZE, MAJOR_COMPACTION_INPUT_SIZE_DESC); + majorCompactionOutputFileCountHisto = getMetricsRegistry() + .newHistogram(MAJOR_COMPACTION_OUTPUT_FILE_COUNT, MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC); + majorCompactionOutputSizeHisto = getMetricsRegistry() + .newSizeHistogram(MAJOR_COMPACTION_OUTPUT_SIZE, MAJOR_COMPACTION_OUTPUT_SIZE_DESC); + majorCompactedInputBytes = getMetricsRegistry() + .newCounter(MAJOR_COMPACTED_INPUT_BYTES, MAJOR_COMPACTED_INPUT_BYTES_DESC, 0L); + majorCompactedOutputBytes = getMetricsRegistry() + .newCounter(MAJOR_COMPACTED_OUTPUT_BYTES, MAJOR_COMPACTED_OUTPUT_BYTES_DESC, 0L); + splitTimeHisto = getMetricsRegistry().newTimeHistogram(SPLIT_KEY); splitRequest = getMetricsRegistry().newCounter(SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L); splitSuccess = getMetricsRegistry().newCounter(SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L); } @@ -180,6 +240,62 @@ public class MetricsRegionServerSourceImpl flushTimeHisto.add(t); } + @Override + public void updateFlushMemstoreSize(long bytes) { + flushMemstoreSizeHisto.add(bytes); + flushedMemstoreBytes.incr(bytes); + } + + @Override + public void updateFlushOutputSize(long bytes) { + flushOutputSizeHisto.add(bytes); + flushedOutputBytes.incr(bytes); + } + + @Override + public void updateCompactionTime(boolean isMajor, long t) { + compactionTimeHisto.add(t); + if (isMajor) { + majorCompactionTimeHisto.add(t); + } + } + + @Override + public void updateCompactionInputFileCount(boolean isMajor, long c) { + compactionInputFileCountHisto.add(c); + if (isMajor) { + majorCompactionInputFileCountHisto.add(c); + } + } + + @Override + public void updateCompactionInputSize(boolean isMajor, long bytes) { + compactionInputSizeHisto.add(bytes); + compactedInputBytes.incr(bytes); + if (isMajor) { + majorCompactionInputSizeHisto.add(bytes); + majorCompactedInputBytes.incr(bytes); + } + } + + @Override + public void updateCompactionOutputFileCount(boolean isMajor, long c) { + compactionOutputFileCountHisto.add(c); + if (isMajor) { + majorCompactionOutputFileCountHisto.add(c); + } + } + + @Override + public void updateCompactionOutputSize(boolean isMajor, long bytes) { + compactionOutputSizeHisto.add(bytes); + compactedOutputBytes.incr(bytes); + if (isMajor) { + majorCompactionOutputSizeHisto.add(bytes); + majorCompactedOutputBytes.incr(bytes); + } + } + /** * Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all * expectations of java programmers. Instead of returning anything Hadoop metrics expects @@ -252,6 +368,12 @@ public class MetricsRegionServerSourceImpl rsWrap.getSplitQueueSize()) .addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC), rsWrap.getCompactionQueueSize()) + .addGauge(Interns.info(SMALL_COMPACTION_QUEUE_LENGTH, SMALL_COMPACTION_QUEUE_LENGTH_DESC), + rsWrap.getSmallCompactionQueueSize()) + .addGauge(Interns.info(LARGE_COMPACTION_QUEUE_LENGTH, LARGE_COMPACTION_QUEUE_LENGTH_DESC), + rsWrap.getLargeCompactionQueueSize()) + .addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC), + rsWrap.getCompactionQueueSize()) .addGauge(Interns.info(FLUSH_QUEUE_LENGTH, FLUSH_QUEUE_LENGTH_DESC), rsWrap.getFlushQueueSize()) .addGauge(Interns.info(BLOCK_CACHE_FREE_SIZE, BLOCK_CACHE_FREE_DESC), diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 5723919..9898bb9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -66,6 +66,7 @@ public class FlushTableSubprocedure extends Subprocedure { try { LOG.debug("Flush region " + region.toString() + " started..."); region.flush(true); + // TODO: flush result is not checked? } finally { LOG.debug("Closing region operation on " + region); region.closeRegionOperation(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index e73a456..1a059d7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -120,12 +120,6 @@ public class DefaultStoreEngine extends StoreEngine< } @Override - public List compact(ThroughputController throughputController) - throws IOException { - return compact(throughputController, null); - } - - @Override public List compact(ThroughputController throughputController, User user) throws IOException { return compactor.compact(request, throughputController, user); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a35b9f1..92bb551 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1147,6 +1147,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return memstoreSize.get(); } + @Override public RegionServicesForStores getRegionServicesForStores() { return regionServicesForStores; } @@ -2437,6 +2438,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Otherwise, the snapshot content while backed up in the wal, it will not // be part of the current running servers state. boolean compactionRequested = false; + long flushedOutputFileSize = 0; try { // A. Flush memstore to all the HStores. // Keep running vector of all store files that includes both old and the @@ -2463,6 +2465,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) { totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName); } + flushedOutputFileSize += flush.getOutputFileSize(); } storeFlushCtxs.clear(); @@ -2548,10 +2551,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info(msg); status.setStatus(msg); + if (rsServices != null) { + rsServices.getMetrics().updateFlush(time - startTime, + totalFlushableSizeOfFlushableStores, flushedOutputFileSize); + } + return new FlushResultImpl(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : - FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, - flushOpSeqId); + FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4ab2693..1476190 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -620,6 +620,7 @@ public class HRegionServer extends HasThread implements if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { + @Override public void handle(Signal signal) { getConfiguration().reloadConfiguration(); configurationManager.notifyAllObservers(getConfiguration()); @@ -3418,4 +3419,9 @@ public class HRegionServer extends HasThread implements } this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); } + + @Override + public MetricsRegionServer getMetrics() { + return metricsRegionServer; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 8961d17..282c2bd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -184,6 +184,7 @@ public class HStore implements Store { private volatile long compactedCellsCount = 0; private volatile long majorCompactedCellsCount = 0; private volatile long flushedCellsSize = 0; + private volatile long flushedOutputFileSize = 0; private volatile long compactedCellsSize = 0; private volatile long majorCompactedCellsSize = 0; @@ -1210,6 +1211,7 @@ public class HStore implements Store { // Commence the compaction. List newFiles = compaction.compact(throughputController, user); + long outputBytes = 0L; // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { LOG.warn("hbase.hstore.compaction.complete is set to false"); @@ -1235,10 +1237,22 @@ public class HStore implements Store { compactedCellsCount += getCompactionProgress().totalCompactingKVs; compactedCellsSize += getCompactionProgress().totalCompactedSize; } + + for (StoreFile sf : sfs) { + outputBytes += sf.getReader().length(); + } + // At this point the store will use new files for all new scanners. completeCompaction(filesToCompact); // update store size. - logCompactionEndMessage(cr, sfs, compactionStartTime); + long now = EnvironmentEdgeManager.currentTime(); + if (region.getRegionServerServices() != null) { + region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(), + now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), + outputBytes); + } + + logCompactionEndMessage(cr, sfs, now, compactionStartTime); return sfs; } finally { finishCompactionRequest(cr); @@ -1330,8 +1344,7 @@ public class HStore implements Store { * @param compactionStartTime Start time. */ private void logCompactionEndMessage( - CompactionRequest cr, List sfs, long compactionStartTime) { - long now = EnvironmentEdgeManager.currentTime(); + CompactionRequest cr, List sfs, long now, long compactionStartTime) { StringBuilder message = new StringBuilder( "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " @@ -2129,6 +2142,7 @@ public class HStore implements Store { private List committedFiles; private long cacheFlushCount; private long cacheFlushSize; + private long outputFileSize; private StoreFlusherImpl(long cacheFlushSeqNum) { this.cacheFlushSeqNum = cacheFlushSeqNum; @@ -2163,7 +2177,9 @@ public class HStore implements Store { List storeFiles = new ArrayList(this.tempFiles.size()); for (Path storeFilePath : tempFiles) { try { - storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status)); + StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); + outputFileSize += sf.getReader().length(); + storeFiles.add(sf); } catch (IOException ex) { LOG.error("Failed to commit store file " + storeFilePath, ex); // Try to delete the files we have committed before. @@ -2189,12 +2205,18 @@ public class HStore implements Store { HStore.this.flushedCellsCount += cacheFlushCount; HStore.this.flushedCellsSize += cacheFlushSize; + HStore.this.flushedOutputFileSize += outputFileSize; // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); } @Override + public long getOutputFileSize() { + return outputFileSize; + } + + @Override public List getCommittedFiles() { return committedFiles; } @@ -2257,7 +2279,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2305,6 +2327,11 @@ public class HStore implements Store { } @Override + public long getFlushedOutputFileSize() { + return flushedOutputFileSize; + } + + @Override public long getCompactedCellsCount() { return compactedCellsCount; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 40c5046..a69d8c0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -486,25 +486,16 @@ class MemStoreFlusher implements FlushRequester { */ private boolean flushRegion(final Region region, final boolean emergencyFlush, boolean forceFlushAllStores) { - long startTime = 0; synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); // Use the start time of the FlushRegionEntry if available - if (fqe != null) { - startTime = fqe.createTime; - } if (fqe != null && emergencyFlush) { // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); - } - } - if (startTime == 0) { - // Avoid getting the system time unless we don't have a FlushRegionEntry; - // shame we can't capture the time also spent in the above synchronized - // block - startTime = EnvironmentEdgeManager.currentTime(); + } } + lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); @@ -518,10 +509,6 @@ class MemStoreFlusher implements FlushRequester { server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } - if (flushResult.isFlushSucceeded()) { - long endTime = EnvironmentEdgeManager.currentTime(); - server.metricsRegionServer.updateFlushTime(endTime - startTime); - } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical // section, we get a DroppedSnapshotException and a replay of wal diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 7ff9bed..8bca6c5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -117,7 +117,18 @@ public class MetricsRegionServer { serverSource.incrSplitSuccess(); } - public void updateFlushTime(long t) { + public void updateFlush(long t, long memstoreSize, long fileSize) { serverSource.updateFlushTime(t); + serverSource.updateFlushMemstoreSize(memstoreSize); + serverSource.updateFlushOutputSize(fileSize); + } + + public void updateCompaction(boolean isMajor, long t, int inputFileCount, int outputFileCount, + long inputBytes, long outputBytes) { + serverSource.updateCompactionTime(isMajor, t); + serverSource.updateCompactionInputFileCount(isMajor, inputFileCount); + serverSource.updateCompactionOutputFileCount(isMajor, outputFileCount); + serverSource.updateCompactionInputSize(isMajor, inputBytes); + serverSource.updateCompactionOutputSize(isMajor, outputBytes); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f4a2574..2d27219 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -166,7 +166,6 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; -import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; @@ -1421,14 +1420,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (shouldFlush) { boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; - long startTime = EnvironmentEdgeManager.currentTime(); // Go behind the curtain so we can manage writing of the flush WAL marker HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl) ((HRegion)region).flushcache(true, writeFlushWalMarker); - if (flushResult.isFlushSucceeded()) { - long endTime = EnvironmentEdgeManager.currentTime(); - regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); - } boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region, @@ -1567,18 +1561,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } LOG.info("Receiving merging request for " + regionA + ", " + regionB + ",forcible=" + forcible); - long startTime = EnvironmentEdgeManager.currentTime(); - FlushResult flushResult = regionA.flush(true); - if (flushResult.isFlushSucceeded()) { - long endTime = EnvironmentEdgeManager.currentTime(); - regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); - } - startTime = EnvironmentEdgeManager.currentTime(); - flushResult = regionB.flush(true); - if (flushResult.isFlushSucceeded()) { - long endTime = EnvironmentEdgeManager.currentTime(); - regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); - } + regionA.flush(true); + regionB.flush(true); regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime, RpcServer.getRequestUser()); return MergeRegionsResponse.newBuilder().build(); @@ -1991,12 +1975,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, + "Replicas are auto-split when their primary is split."); } LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString()); - long startTime = EnvironmentEdgeManager.currentTime(); - FlushResult flushResult = region.flush(true); - if (flushResult.isFlushSucceeded()) { - long endTime = EnvironmentEdgeManager.currentTime(); - regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); - } + region.flush(true); byte[] splitPoint = null; if (request.hasSplitPoint()) { splitPoint = request.getSplitPoint().toByteArray(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 2a71629..c6689a9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -244,4 +244,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * global memstore size already exceeds lower limit. */ double getFlushPressure(); + + /** + * @return the metrics tracker for the region server + */ + MetricsRegionServer getMetrics(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index c167535..1cffaad 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -124,7 +124,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf List getScanners(List files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException; - + ScanInfo getScanInfo(); /** @@ -439,6 +439,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf long getFlushedCellsSize(); /** + * @return The total size of out output files on disk, in bytes + */ + long getFlushedOutputFileSize(); + + /** * @return The number of cells processed during minor compactions */ long getCompactedCellsCount(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java index f4f25dd..f62e96e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java @@ -85,4 +85,9 @@ interface StoreFlushContext { * @return a list of Paths for new files */ List getCommittedFiles(); + + /** + * @return the total file size for flush output files, in bytes + */ + long getOutputFileSize(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 1e16ca8..9255634 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -100,13 +100,6 @@ public class StripeStoreEngine extends StoreEngine compact(ThroughputController throughputController) - throws IOException { - Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor, throughputController, null); - } - - @Override public List compact(ThroughputController throughputController, User user) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index 6902c40..ef6d7cb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -66,13 +66,6 @@ public abstract class CompactionContext { this.request = request; } - /** - * Runs the compaction based on current selection. select/forceSelect must have been called. - * @return The new file paths resulting from compaction. - */ - public abstract List compact(ThroughputController throughputController) - throws IOException; - public abstract List compact(ThroughputController throughputController, User user) throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index d52077ba..268bb09 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -103,6 +103,7 @@ public class CompactionRequest implements Comparable { this.regionName = other.regionName; this.storeName = other.storeName; this.totalSize = other.totalSize; + recalculateSize(); return this; } @@ -225,10 +226,12 @@ public class CompactionRequest implements Comparable { Collections2.transform(Collections2.filter( this.getFiles(), new Predicate() { + @Override public boolean apply(StoreFile sf) { return sf.getReader() != null; } }), new Function() { + @Override public String apply(StoreFile sf) { return StringUtils.humanReadableInt( (sf.getReader() == null) ? 0 : sf.getReader().length()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index e634327..6cd1963 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -319,6 +320,7 @@ public class MockRegionServerServices implements RegionServerServices { return null; } + @Override public ThroughputController getFlushThroughputController() { return null; } @@ -327,4 +329,9 @@ public class MockRegionServerServices implements RegionServerServices { public double getFlushPressure() { return 0; } + + @Override + public MetricsRegionServer getMetrics() { + return null; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 4de4a5f..69f2e35 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -664,6 +665,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { return null; } + @Override public ThroughputController getFlushThroughputController() { return null; } @@ -672,4 +674,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public double getFlushPressure() { return 0; } + + @Override + public MetricsRegionServer getMetrics() { + return null; + } } \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 06b4c46..72dcbe7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -86,7 +86,7 @@ public class TestCompaction { @Rule public TestName name = new TestName(); private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); protected Configuration conf = UTIL.getConfiguration(); - + private HRegion r = null; private HTableDescriptor htd = null; private static final byte [] COLUMN_FAMILY = fam1; @@ -158,6 +158,7 @@ public class TestCompaction { HRegion spyR = spy(r); doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { r.writestate.writesEnabled = false; return invocation.callRealMethod(); @@ -363,12 +364,6 @@ public class TestCompaction { } @Override - public List compact(ThroughputController throughputController) - throws IOException { - return compact(throughputController, null); - } - - @Override public List compact(ThroughputController throughputController, User user) throws IOException { finishCompaction(this.selectedFiles); @@ -421,12 +416,6 @@ public class TestCompaction { } @Override - public List compact(ThroughputController throughputController) - throws IOException { - return compact(throughputController, null); - } - - @Override public List compact(ThroughputController throughputController, User user) throws IOException { try { @@ -467,6 +456,7 @@ public class TestCompaction { @Override public void cancelCompaction(Object object) {} + @Override public int getPriority() { return Integer.MIN_VALUE; // some invalid value, see createStoreMock } @@ -511,9 +501,10 @@ public class TestCompaction { when( r.compact(any(CompactionContext.class), any(Store.class), any(ThroughputController.class), any(User.class))).then(new Answer() { + @Override public Boolean answer(InvocationOnMock invocation) throws Throwable { invocation.getArgumentAt(0, CompactionContext.class).compact( - invocation.getArgumentAt(2, ThroughputController.class)); + invocation.getArgumentAt(2, ThroughputController.class), null); return true; } }); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index 4bfa64d..5f56ba9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -138,5 +138,69 @@ public class TestMetricsRegionServer { HELPER.assertCounter("slowIncrementCount", 15, serverSource); HELPER.assertCounter("slowPutCount", 16, serverSource); } + + String FLUSH_TIME = "flushTime"; + String FLUSH_TIME_DESC = "Histogram for the time in millis for memstore flush"; + String FLUSH_MEMSTORE_SIZE = "flushMemstoreSize"; + String FLUSH_MEMSTORE_SIZE_DESC = "Histogram for number of bytes in the memstore for a flush"; + String FLUSH_FILE_SIZE = "flushFileSize"; + String FLUSH_FILE_SIZE_DESC = "Histogram for number of bytes in the resulting file for a flush"; + String FLUSHED_OUTPUT_BYTES = "flushedOutputBytes"; + String FLUSHED_OUTPUT_BYTES_DESC = "Total number of bytes written from flush"; + String FLUSHED_MEMSTORE_BYTES = "flushedMemstoreBytes"; + String FLUSHED_MEMSTORE_BYTES_DESC = "Total number of bytes of cells in memstore from flush"; + + @Test + public void testFlush() { + rsm.updateFlush(1, 2, 3); + HELPER.assertCounter("flushTime_num_ops", 1, serverSource); + HELPER.assertCounter("flushMemstoreSize_num_ops", 1, serverSource); + HELPER.assertCounter("flushOutputSize_num_ops", 1, serverSource); + HELPER.assertCounter("flushedMemstoreBytes", 2, serverSource); + HELPER.assertCounter("flushedOutputBytes", 3, serverSource); + + rsm.updateFlush(10, 20, 30); + HELPER.assertCounter("flushTimeNumOps", 2, serverSource); + HELPER.assertCounter("flushMemstoreSize_num_ops", 2, serverSource); + HELPER.assertCounter("flushOutputSize_num_ops", 2, serverSource); + HELPER.assertCounter("flushedMemstoreBytes", 22, serverSource); + HELPER.assertCounter("flushedOutputBytes", 33, serverSource); + } + + @Test + public void testCompaction() { + rsm.updateCompaction(false, 1, 2, 3, 4, 5); + HELPER.assertCounter("compactionTime_num_ops", 1, serverSource); + HELPER.assertCounter("compactionInputFileCount_num_ops", 1, serverSource); + HELPER.assertCounter("compactionInputSize_num_ops", 1, serverSource); + HELPER.assertCounter("compactionOutputFileCount_num_ops", 1, serverSource); + HELPER.assertCounter("compactedInputBytes", 4, serverSource); + HELPER.assertCounter("compactedoutputBytes", 5, serverSource); + + rsm.updateCompaction(false, 10, 20, 30, 40, 50); + HELPER.assertCounter("compactionTime_num_ops", 2, serverSource); + HELPER.assertCounter("compactionInputFileCount_num_ops", 2, serverSource); + HELPER.assertCounter("compactionInputSize_num_ops", 2, serverSource); + HELPER.assertCounter("compactionOutputFileCount_num_ops", 2, serverSource); + HELPER.assertCounter("compactedInputBytes", 44, serverSource); + HELPER.assertCounter("compactedoutputBytes", 55, serverSource); + + // do major compaction + rsm.updateCompaction(true, 100, 200, 300, 400, 500); + + HELPER.assertCounter("compactionTime_num_ops", 3, serverSource); + HELPER.assertCounter("compactionInputFileCount_num_ops", 3, serverSource); + HELPER.assertCounter("compactionInputSize_num_ops", 3, serverSource); + HELPER.assertCounter("compactionOutputFileCount_num_ops", 3, serverSource); + HELPER.assertCounter("compactedInputBytes", 444, serverSource); + HELPER.assertCounter("compactedoutputBytes", 555, serverSource); + + HELPER.assertCounter("majorCompactionTime_num_ops", 1, serverSource); + HELPER.assertCounter("majorCompactionInputFileCount_num_ops", 1, serverSource); + HELPER.assertCounter("majorCompactionInputSize_num_ops", 1, serverSource); + HELPER.assertCounter("majorCompactionOutputFileCount_num_ops", 1, serverSource); + HELPER.assertCounter("majorCompactedInputBytes", 400, serverSource); + HELPER.assertCounter("majorCompactedoutputBytes", 500, serverSource); + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 6b641c1..635e5b4 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -96,7 +96,7 @@ public class TestStripeStoreEngine { assertEquals(2, compaction.getRequest().getFiles().size()); assertFalse(compaction.getRequest().getFiles().contains(sf)); // Make sure the correct method it called on compactor. - compaction.compact(NoLimitThroughputController.INSTANCE); + compaction.compact(NoLimitThroughputController.INSTANCE, null); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, NoLimitThroughputController.INSTANCE, null);