From 6a102f98a100e4cdc72908d17d74b1c9f3e29a3f Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Sun, 8 Dec 2013 16:03:23 -0800 Subject: [PATCH] HBASE-9806 Add PerfEval tool for BlockCache Initial sketch for a perf tool. $ ./bin/hbase org.apache.hadoop.hbase.BlockCachePerformanceEvaluation --help usage: bin/hbase org.apache.hadoop.hbase.BlockCachePerformanceEvaluation Options: -b,--blocksize [optional] Run test with a custom blocksize (default: 65536). -h,--help Show usage -i,--iterations [optional] Number of times to run a test, more is better (default: 100). -r,--rows [optional] Number of rows to generate (1 row ~= 1k). Leave unset to automatically select based on BlockCache size (not very reliable). -s,--seed [optional] Seed value for the random generator. --- .../hbase/BlockCachePerformanceEvaluation.java | 519 +++++++++++++++++++++ 1 file changed, 519 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/BlockCachePerformanceEvaluation.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/BlockCachePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/BlockCachePerformanceEvaluation.java new file mode 100644 index 0000000..eef2e72 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/BlockCachePerformanceEvaluation.java @@ -0,0 +1,519 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.apache.hadoop.util.StringUtils.humanReadableInt; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.math.BigDecimal; +import java.math.MathContext; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.random.RandomData; +import org.apache.commons.math.random.RandomDataImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; + +/** + * This class runs performance benchmarks for {@link BlockCache}. Attempts to + * measure benchmark results such that only the bytes that the user program + * requests to be loaded or stored and reports in terms of MB/s, same as + * STREAM. In this way, it can be used to test the effectiveness of different + * combinations of {@link BlockCache} and {@link DataBlockEncoder} + * implementations. + * + * @see STREAM. + */ +public class BlockCachePerformanceEvaluation extends AbstractHBaseTool { + + private static final int KEY_LENGTH = 10; + private static final String KEY_BASE = Strings.repeat('0', KEY_LENGTH); + private static final int VALUE_LENGTH = 1000; + private static final BigDecimal ROW_SIZE_BYTES = BigDecimal.valueOf(KEY_LENGTH + VALUE_LENGTH); + private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); + private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); + private static final MathContext CXT = MathContext.DECIMAL64; + private static final DecimalFormat FMT = new DecimalFormat("0.##"); + + private static final int DEFAULT_ITERATIONS = 100; + private static final int DEFAULT_BLOCKSIZE = HConstants.DEFAULT_BLOCKSIZE; + private static final String OPT_ITERATIONS = "iterations"; + private static final String OPT_BLOCKSIZE = "blocksize"; + private static final String OPT_SEED = "seed"; + private static final String OPT_ROWS = "rows"; + static final Log LOG = LogFactory.getLog(BlockCachePerformanceEvaluation.class.getName()); + + private int blockSize; + private int iterations; + private int rows; + private RandomDataImpl rand; + private Map results; + + static byte [] format(final int i) { + String v = Integer.toString(i); + return Bytes.toBytes(KEY_BASE.substring(v.length()) + v); + } + + protected void logMemoryStats() { + MemoryMXBean memStats = ManagementFactory.getMemoryMXBean(); + long totalHeap = memStats.getHeapMemoryUsage().getMax(); + long totalOffHeap = memStats.getNonHeapMemoryUsage().getMax(); + + LOG.info("Memory stats [" + + " totalHeap: " + humanReadableInt(totalHeap) + + " (" + totalHeap + ")" + + " totalOffHeap: " + humanReadableInt(totalOffHeap) + + " (" + totalOffHeap + ")" + + "]"); + } + + protected void logResults() { + for (Map.Entry e : results.entrySet()) { + BigDecimal totalRows = + BigDecimal.valueOf(e.getValue().length).multiply(BigDecimal.valueOf(rows), CXT); + BigDecimal totalTimeMS = BigDecimal.ZERO; + for (long val : e.getValue()) totalTimeMS = totalTimeMS.add(BigDecimal.valueOf(val)); + BigDecimal avg = totalTimeMS.divide(BigDecimal.valueOf(e.getValue().length)); + // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS) + // * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB + BigDecimal mbps = totalRows.multiply(ROW_SIZE_BYTES, CXT).divide(totalTimeMS, CXT) + .multiply(MS_PER_SEC).divide(BYTES_PER_MB); + StringBuilder msg = new StringBuilder() + .append(e.getKey()).append(": ") + .append(Arrays.toString(e.getValue())) + .append(", avg: ").append(avg).append(" ms") + .append(", ").append(FMT.format(mbps)).append(" MB/s."); + LOG.info(msg); + } + } + + @Override + public int doWork() throws Exception { + logMemoryStats(); + if (results == null) { + results = new HashMap(); + } + + // populate the cache while data is generated. + conf.setBoolean("hbase.rs.cacheblocksonwrite", true); + final FileSystem fs = FileSystem.get(conf); + final Path mf = fs.makeQualified(new Path("performanceevaluation.blockcache")); + try { + if (fs.exists(mf)) { + fs.delete(mf, true); + } + + // populate the cache + final DataWriter wb = new DataWriter(conf, mf, rand, rows, blockSize); + wb.run(); + // just in case we write fewer than requested. + rows = wb.getRowsWritten(); + LOG.info("Populated the HFile with " + rows + " rows."); + + RowOrientedBenchmark[] suite = new RowOrientedBenchmark[] { + new SequentialReadBenchmark(conf, mf, rand, rows), + new UniformRandomSmallScanBenchmark(conf, mf, rand, rows), + new UniformRandomReadBenchmark(conf, mf, rand, rows), + new GaussianRandomReadBenchmark(conf, mf, rand, rows), + }; + + for (final RowOrientedBenchmark b : suite) { + runBenchmark(b); + } + } finally { + if (fs.exists(mf)) { + fs.delete(mf, true); + } + } + + logResults(); + return 0; + } + + protected void runBenchmark(RowOrientedBenchmark benchmark) throws Exception { + String name = benchmark.getClass().getSimpleName(); + if (!results.containsKey(name)) { + results.put(name, new long[iterations]); + } + + LOG.info("Running " + name + " " + iterations + " time(s)."); + for (int i = 0; i < iterations; i++) { + long elapsedTime = benchmark.run(); + results.get(name)[i] = elapsedTime; + LOG.debug(name + "[" + (i + 1) + "/" + iterations + "] took " + elapsedTime + "ms."); + } + } + + static abstract class RowOrientedBenchmark { + + protected final Configuration conf; + protected final Path mf; + protected final RandomData rand; + protected final CacheConfig cacheConfig; + protected final BlockCache cache; + + public RowOrientedBenchmark(Configuration conf, Path mf, RandomData rand) { + this.conf = conf; + this.mf = mf; + this.rand = rand; + this.cacheConfig = new CacheConfig(this.conf); + this.cache = this.cacheConfig.getBlockCache(); + } + + protected void logCacheStats() { + if (LOG.isDebugEnabled()) { + LOG.debug("BlockCache stats [" + + " currentSize: " + humanReadableInt(cache.getCurrentSize()) + + " (" + cache.getCurrentSize() + ")" + + " freeSize: " + humanReadableInt(cache.getFreeSize()) + + " (" + cache.getFreeSize() + ")" + + " size: " + cache.size() + + " blockCount: " + cache.getBlockCount() + + " evictedCount: " + cache.getEvictedCount() + + "]"); + } + } + + void setUp() throws Exception { + logCacheStats(); + } + + abstract void doRow(int i) throws Exception; + + void tearDown() throws Exception { + logCacheStats(); + } + + /** Run benchmark */ + abstract long run() throws Exception; + + long logInterval(long totalRecords) { + return totalRecords / 10; + } + } + + /** Use me to populate the cache */ + static class DataWriter extends RowOrientedBenchmark { + protected HFile.Writer writer; + private byte[] bytes = new byte[VALUE_LENGTH]; + private int blockSize; + private long rows = -1; + private int rowsWritten = -1; + + public DataWriter(Configuration conf, Path mf, RandomData rand, long rows, int blockSize) { + super(conf, mf, rand); + this.blockSize = blockSize; + this.rows = rows; + } + + @Override + void setUp() throws Exception { + super.setUp(); + FileSystem fs = FileSystem.get(conf); + HFileContext hFileContext = new HFileContextBuilder().withBlockSize(blockSize).build(); + writer = + HFile.getWriterFactory(conf, cacheConfig) + .withPath(fs, mf) + .withFileContext(hFileContext) + .withComparator(new KeyValue.RawBytesComparator()) + .create(); + } + + /** Run benchmark */ + @Override + long run() throws Exception { + long elapsedTime; + setUp(); + int i = 0; + long rowsToWrite = -1 != rows + ? rows + // TODO: cache implementations have inconsistent implementations of getFreeSize(); + // this calculation probably doesn't work for anything but LruBlockCache. + // TODO: this estimate is not quite accurate. + : (long) (cache.getFreeSize() * 0.90 / (10 + VALUE_LENGTH)); + long logInterval = logInterval(rowsToWrite); + LOG.info("Populating cache with " + rowsToWrite + " rows."); + long startTime = System.currentTimeMillis(); + try { + // write blocks until cache is full. + for (; i < rowsToWrite; i++) { + if (i > 0 && i % logInterval == 0) { + LOG.debug("Processed " + humanReadableInt(i) + " rows."); + } + doRow(i); + assert i >= 0 : "rowcounter overflow!"; + } + elapsedTime = System.currentTimeMillis() - startTime; + } finally { + rowsWritten = i; + tearDown(); + } + return elapsedTime; + } + + @Override + void doRow(int i) throws Exception { + writer.append(format(i), generateValue()); + } + + private byte[] generateValue() { + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) rand.nextInt(0, Byte.MAX_VALUE); + } + return bytes; + } + + @Override + void tearDown() throws Exception { + writer.close(); + super.tearDown(); + } + + public int getRowsWritten() { + assert rowsWritten > 0 : "Test hasn't been run yet."; + return rowsWritten; + } + } + + static abstract class ReadBenchmark extends RowOrientedBenchmark { + + protected final int totalRows; + protected HFile.Reader reader; + + public ReadBenchmark(Configuration conf, Path mf, RandomData rand, int totalRows) { + super(conf, mf, rand); + this.totalRows = totalRows; + } + + @Override + void setUp() throws Exception { + super.setUp(); + FileSystem fs = FileSystem.get(conf); + reader = HFile.createReader(fs, mf, cacheConfig, conf); + reader.loadFileInfo(); + } + + long run() throws Exception { + long elapsedTime; + setUp(); + long logInterval = logInterval(totalRows); + long startTime = System.currentTimeMillis(); + try { + for (int i = 0; i < totalRows; i++) { + if (i > 0 && i % logInterval == 0) { + LOG.debug("Processed " + humanReadableInt(i) + " rows."); + } + doRow(i); + } + elapsedTime = System.currentTimeMillis() - startTime; + } finally { + tearDown(); + } + return elapsedTime; + } + + @Override + void tearDown() throws Exception { + reader.close(); + super.tearDown(); + } + } + + static class SequentialReadBenchmark extends ReadBenchmark { + private HFileScanner scanner; + private boolean hasMore = true; + + public SequentialReadBenchmark(Configuration conf, Path mf, RandomData rand, int totalRows) { + super(conf, mf, rand, totalRows); + } + + @Override + void setUp() throws Exception { + super.setUp(); + scanner = this.reader.getScanner(true, false); + hasMore = this.scanner.seekTo(); + } + + @Override + void doRow(int i) throws Exception { + byte[] key = format(i); + if (!hasMore) + throw new AssertionError("End of file reached while reading " + Bytes.toString(key)); + + ByteBuffer k = scanner.getKey(); + PerformanceEvaluationCommons.assertKey(key, k); + ByteBuffer v = scanner.getValue(); + PerformanceEvaluationCommons.assertValueSize(v.limit(), VALUE_LENGTH); + hasMore = scanner.next(); + } + } + + static class UniformRandomReadBenchmark extends ReadBenchmark { + + public UniformRandomReadBenchmark(Configuration conf, Path mf, RandomData rand, + int totalRows) { + super(conf, mf, rand, totalRows); + } + + @Override + void doRow(int i) throws Exception { + HFileScanner scanner = reader.getScanner(true, true); + byte [] b = getRandomRow(); + if (scanner.seekTo(b) < 0) { + LOG.info("Not able to seekTo " + new String(b)); + return; + } + ByteBuffer k = scanner.getKey(); + PerformanceEvaluationCommons.assertKey(b, k); + ByteBuffer v = scanner.getValue(); + PerformanceEvaluationCommons.assertValueSize(v.limit(), VALUE_LENGTH); + } + + private byte [] getRandomRow() { + assert totalRows > 0; + int row = rand.nextInt(0, totalRows - 1); + if (row >= 0) + // this happens o.O + return format(row); + + LOG.warn("rand produced a value out of bounds."); + return getRandomRow(); + } + } + + static class UniformRandomSmallScanBenchmark extends ReadBenchmark { + + public UniformRandomSmallScanBenchmark(Configuration conf, Path mf, RandomData rand, + int totalRows) { + super(conf, mf, rand, totalRows / 10); + } + + @Override + void doRow(int i) throws Exception { + HFileScanner scanner = reader.getScanner(true, false); + byte [] b = getRandomRow(); + if (scanner.seekTo(b) != 0) { + LOG.info("Nonexistent row: " + new String(b)); + return; + } + ByteBuffer k = scanner.getKey(); + PerformanceEvaluationCommons.assertKey(b, k); + LOG.trace("Found row: " + new String(b)); + for (int ii = 0; ii < 30; ii++) { + if (!scanner.next()) { + LOG.info("NOTHING FOLLOWS"); + return; + } + ByteBuffer v = scanner.getValue(); + PerformanceEvaluationCommons.assertValueSize(v.limit(), VALUE_LENGTH); + } + } + + private byte [] getRandomRow() { + return format(rand.nextInt(0, totalRows - 1)); + } + } + + static class GaussianRandomReadBenchmark extends ReadBenchmark { + + private RandomData randomData = new RandomDataImpl(); + + public GaussianRandomReadBenchmark(Configuration conf, Path mf, RandomData rand, + int totalRows) { + super(conf, mf, rand, totalRows); + } + + @Override + void doRow(int i) throws Exception { + HFileScanner scanner = reader.getScanner(false, true); + byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes(); + if (scanner.seekTo(gaussianRandomRowBytes) < 0) { + LOG.info("Not able to seekTo " + new String(gaussianRandomRowBytes)); + return; + } + for (int ii = 0; ii < 30; ii++) { + if (!scanner.next()) { + LOG.info("NOTHING FOLLOWS"); + return; + } + scanner.getKey(); + scanner.getValue(); + } + } + + private byte [] getGaussianRandomRowBytes() { + int r = (int) randomData.nextGaussian((double) totalRows / 2.0, (double) totalRows / 10.0); + return format(r); + } + } + + @Override + protected void addOptions() { + addOptWithArg("r", OPT_ROWS, "[optional] Number of rows to generate (1 row" + + " ~= 1k). Leave unset to automatically select based on BlockCache size" + + " (not very reliable)."); + addOptWithArg("i", OPT_ITERATIONS, "[optional] Number of times to run a" + + " test, more is better (default: " + DEFAULT_ITERATIONS + ")."); + addOptWithArg("b", OPT_BLOCKSIZE, "[optional] Run test with a custom" + + " blocksize (default: " + DEFAULT_BLOCKSIZE + ")."); + addOptWithArg("s", OPT_SEED, "[optional] Seed value for the random generator."); + } + + @Override + protected void processOptions(CommandLine cmd) { + iterations = cmd.hasOption(OPT_ITERATIONS) + ? Integer.parseInt(cmd.getOptionValue(OPT_ITERATIONS)) + : DEFAULT_ITERATIONS; + blockSize = cmd.hasOption(OPT_BLOCKSIZE) + ? Integer.parseInt(cmd.getOptionValue(OPT_BLOCKSIZE)) + : DEFAULT_BLOCKSIZE; + long seed = cmd.hasOption(OPT_SEED) + ? Long.parseLong(cmd.getOptionValue(OPT_SEED)) + : System.currentTimeMillis(); + LOG.info("Using random seed: " + seed); + rand = new RandomDataImpl(); + rand.reSeed(seed); + rows = cmd.hasOption(OPT_ROWS) + ? Integer.parseInt(cmd.getOptionValue("rows")) + : -1; + } + + public static void main(String[] args) throws Exception { + new BlockCachePerformanceEvaluation().doStaticMain(args); + } +} -- 1.8.4.2