diff --git a/conf/hbase-env.sh b/conf/hbase-env.sh index 91aca76..e6d1c9c 100644 --- a/conf/hbase-env.sh +++ b/conf/hbase-env.sh @@ -66,10 +66,12 @@ export HBASE_OPTS="-XX:+UseConcMarkSweepGC" # If FILE-PATH is not replaced, the log file(.gc) would still be generated in the HBASE_LOG_DIR . # export CLIENT_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc: -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=512M" -# Uncomment below if you intend to use the EXPERIMENTAL off heap cache. -# export HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=" -# Set hbase.offheapcache.percentage in hbase-site.xml to a nonzero value. - +# Uncomment below if you intend to use off heap cache. +# export HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=SET_THIS_TO_HOW_MANY_GIGS_OF_OFFHEAP" +# For example, to allocate 8G of offheap, set SET_THIS_TO_HOW_MANY_GIGS_OF_OFFHEAP to 8G as in: +# export HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=8G" +# See the package documentation for org.apache.hadoop.hbase.io.hfile for other configurations +# needed setting up off-heap block caching. # Uncomment and adjust to enable JMX exporting # See jmxremote.password and jmxremote.access in $JRE_HOME/lib/management to configure remote password access. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2851a42..8699d32 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -55,7 +55,7 @@ public final class ByteBufferArray { this.bufferSize = (int) roundUp(capacity / 16, 32768); this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) - + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); buffers = new ByteBuffer[bufferCount + 1]; locks = new Lock[bufferCount + 1]; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f0fafd1..c61f347 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -939,8 +939,10 @@ possible configurations would overwhelm and obscure the important. hbase.offheapcache.percentage 0 The percentage of the off heap space (-XX:MaxDirectMemorySize) to be - allocated towards the experimental off heap cache. If you desire the cache to be - disabled, simply set this value to 0. + allocated towards the experimental off heap "SlabCache" (This is different to + the BucketCache -- see the package javadoc for org.apache.hadoop.hbase.io.hfile + for more on your options). If you desire the cache to be disabled, simply set this + value to 0. hbase.data.umask.enable @@ -1130,13 +1132,6 @@ possible configurations would overwhelm and obscure the important. - hbase.rest.filter.classes - org.apache.hadoop.hbase.rest.filter.GzipFilter - - Servlet filters for REST service. - - - hbase.master.loadbalancer.class org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java index b98e214..5c7fb45 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Before; @@ -59,7 +60,7 @@ import org.junit.experimental.categories.Category; *
"IntegrationTestTableSnapshotInputFormat.table" => the name of the table *
"IntegrationTestTableSnapshotInputFormat.snapshot" => the name of the snapshot *
"IntegrationTestTableSnapshotInputFormat.numRegions" => number of regions in the table - * to be created + * to be created (default, 32). *
"IntegrationTestTableSnapshotInputFormat.tableDir" => temporary directory to restore the * snapshot files * @@ -78,10 +79,22 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase private static final String NUM_REGIONS_KEY = "IntegrationTestTableSnapshotInputFormat.numRegions"; - private static final int DEFAULT_NUM_REGIONS = 32; + private static final String MR_IMPLEMENTATION_KEY = + "IntegrationTestTableSnapshotInputFormat.API"; + private static final String MAPRED_IMPLEMENTATION = "mapred"; + private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce"; + private static final int DEFAULT_NUM_REGIONS = 32; private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir"; + private static final byte[] START_ROW = Bytes.toBytes("bbb"); + private static final byte[] END_ROW = Bytes.toBytes("yyy"); + + // mapred API missing feature pairity with mapreduce. See comments in + // mapred.TestTableSnapshotInputFormat + private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa"); + private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{' + private IntegrationTestingUtility util; @Override @@ -124,17 +137,39 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase tableDir = new Path(tableDirStr); } - /* We create the table using HBaseAdmin#createTable(), which will create the table - * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if - * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we - * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow - * bbb and endRow yyy, so, we expect the first and last region to be filtered out in - * the input format, and we expect numRegions - 2 splits between bbb and yyy. - */ - int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions; - - TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir, - numRegions, expectedNumSplits, false); + final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION); + if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) { + /* + * We create the table using HBaseAdmin#createTable(), which will create the table + * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if + * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we + * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow + * bbb and endRow yyy, so, we expect the first and last region to be filtered out in + * the input format, and we expect numRegions - 2 splits between bbb and yyy. + */ + LOG.debug("Running job with mapreduce API."); + int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions; + + org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util, + tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, + expectedNumSplits, false); + } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) { + /* + * Similar considerations to above. The difference is that mapred API does not support + * specifying start/end rows (or a scan object at all). Thus the omission of first and + * last regions are not performed. See comments in mapred.TestTableSnapshotInputFormat + * for details of how that test works around the problem. This feature should be added + * in follow-on work. + */ + LOG.debug("Running job with mapred API."); + int expectedNumSplits = numRegions; + + org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util, + tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, + expectedNumSplits, false); + } else { + throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +"."); + } return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 59fd2c0..321685f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.io.hfile.slab.SlabCache; import org.apache.hadoop.hbase.util.DirectMemoryUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Stores all of the cache objects and configuration for a single HFile. */ @@ -83,7 +85,7 @@ public class CacheConfig { * to the file that will host the file-based cache. See BucketCache#getIOEngineFromName() for * list of supported ioengine options. * - *

Set this option and a non-zero {@link BUCKET_CACHE_SIZE_KEY} to enable bucket cache. + *

Set this option and a non-zero {@link #BUCKET_CACHE_SIZE_KEY} to enable bucket cache. */ public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine"; @@ -91,10 +93,10 @@ public class CacheConfig { * When using bucket cache, this is a float that EITHER represents a percentage of total heap * memory size to give to the cache (if < 1.0) OR, it is the capacity in megabytes of the cache. * - *

The resultant size is further divided if {@link BUCKET_CACHE_COMBINED_KEY} is set (It is + *

The resultant size is further divided if {@link #BUCKET_CACHE_COMBINED_KEY} is set (It is * set by default. When false, bucket cache serves as an "L2" cache to the "L1" * {@link LruBlockCache}). The percentage is set in - * with {@link BUCKET_CACHE_COMBINED_PERCENTAGE_KEY} float. + * with {@link #BUCKET_CACHE_COMBINED_PERCENTAGE_KEY} float. */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; @@ -115,7 +117,7 @@ public class CacheConfig { /** * A float which designates how much of the overall cache to give to bucket cache - * and how much to on-heap lru cache when {@link BUCKET_CACHE_COMBINED_KEY} is set. + * and how much to on-heap lru cache when {@link #BUCKET_CACHE_COMBINED_KEY} is set. */ public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = "hbase.bucketcache.percentage.in.combinedcache"; @@ -251,6 +253,7 @@ public class CacheConfig { this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; this.cacheCompressed = cacheCompressed; + LOG.info(this); } /** @@ -369,13 +372,13 @@ public class CacheConfig { if (!isBlockCacheEnabled()) { return "CacheConfig:disabled"; } - return "CacheConfig:enabled " + - "[cacheDataOnRead=" + shouldCacheDataOnRead() + "] " + - "[cacheDataOnWrite=" + shouldCacheDataOnWrite() + "] " + - "[cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + "] " + - "[cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + "] " + - "[cacheEvictOnClose=" + shouldEvictOnClose() + "] " + - "[cacheCompressed=" + shouldCacheCompressed() + "]"; + return "blockCache=" + getBlockCache() + + ", cacheDataOnRead=" + shouldCacheDataOnRead() + + ", cacheDataOnWrite=" + shouldCacheDataOnWrite() + + ", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + + ", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + + ", cacheEvictOnClose=" + shouldEvictOnClose() + + ", cacheCompressed=" + shouldCacheCompressed(); } // Static block cache reference and methods @@ -384,7 +387,9 @@ public class CacheConfig { * Static reference to the block cache, or null if no caching should be used * at all. */ - private static BlockCache globalBlockCache; + // Clear this if in tests you'd make more than one block cache instance. + @VisibleForTesting + static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; /** Boolean whether we have disabled the block cache entirely. */ private static boolean blockCacheDisabled = false; @@ -396,7 +401,7 @@ public class CacheConfig { * @return The block cache or null. */ public static synchronized BlockCache instantiateBlockCache(Configuration conf) { - if (globalBlockCache != null) return globalBlockCache; + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; if (blockCacheDisabled) return null; float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, @@ -414,10 +419,10 @@ public class CacheConfig { MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); long lruCacheSize = (long) (mu.getMax() * cachePercentage); int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); - long offHeapCacheSize = - (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) * + long slabCacheOffHeapCacheSize = + (long) (conf.getFloat(SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY, (float) 0) * DirectMemoryUtils.getDirectMemorySize()); - if (offHeapCacheSize <= 0) { + if (slabCacheOffHeapCacheSize <= 0) { String bucketCacheIOEngineName = conf.get(BUCKET_CACHE_IOENGINE_KEY, null); float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); // A percentage of max heap size or a absolute value with unit megabytes @@ -452,19 +457,19 @@ public class CacheConfig { throw new RuntimeException(ioex); } } - LOG.info("Allocating LruBlockCache with maximum size " + - StringUtils.humanReadableInt(lruCacheSize) + ", blockSize=" + blockSize); + LOG.info("Allocating LruBlockCache size=" + + StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); LruBlockCache lruCache = new LruBlockCache(lruCacheSize, blockSize); lruCache.setVictimCache(bucketCache); if (bucketCache != null && combinedWithLru) { - globalBlockCache = new CombinedBlockCache(lruCache, bucketCache); + GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(lruCache, bucketCache); } else { - globalBlockCache = lruCache; + GLOBAL_BLOCK_CACHE_INSTANCE = lruCache; } } else { - globalBlockCache = new DoubleBlockCache( - lruCacheSize, offHeapCacheSize, blockSize, blockSize, conf); + GLOBAL_BLOCK_CACHE_INSTANCE = new DoubleBlockCache( + lruCacheSize, slabCacheOffHeapCacheSize, blockSize, blockSize, conf); } - return globalBlockCache; + return GLOBAL_BLOCK_CACHE_INSTANCE; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 37221df..6f23d16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -31,8 +31,9 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; * CombinedBlockCache is an abstraction layer that combines * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used * to cache bloom blocks and index blocks. The larger bucketCache is used to - * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean) reads - * first from the smaller lruCache before looking for the block in the bucketCache. + * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean)}, boolean, boolean) reads + * first from the smaller lruCache before looking for the block in the bucketCache. Blocks evicted + * from lruCache are put into the bucket cache. * Metrics are the combined size and hits and misses of both caches. * */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java index 0e32b4b..274f847 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java @@ -63,15 +63,15 @@ public class DoubleBlockCache implements ResizableBlockCache, HeapSize { long onHeapBlockSize, long offHeapBlockSize, Configuration conf) { LOG.info("Creating on-heap cache of size " - + StringUtils.humanReadableInt(onHeapSize) - + "bytes with an average block size of " - + StringUtils.humanReadableInt(onHeapBlockSize) + " bytes."); + + StringUtils.byteDesc(onHeapSize) + + " with an average block size of " + + StringUtils.byteDesc(onHeapBlockSize)); onHeapCache = new LruBlockCache(onHeapSize, onHeapBlockSize, conf); LOG.info("Creating off-heap cache of size " - + StringUtils.humanReadableInt(offHeapSize) - + "bytes with an average block size of " - + StringUtils.humanReadableInt(offHeapBlockSize) + " bytes."); + + StringUtils.byteDesc(offHeapSize) + + "with an average block size of " + + StringUtils.byteDesc(offHeapBlockSize)); offHeapCache = new SlabCache(offHeapSize, offHeapBlockSize); offHeapCache.addSlabByConf(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/package-info.java index 1f44967..be063f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/package-info.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/package-info.java @@ -16,28 +16,104 @@ * limitations under the License. */ /** - * Provides implementations of {@link HFile} and HFile + * Provides implementations of {@link org.apache.hadoop.hbase.io.hfile.HFile} and HFile * {@link org.apache.hadoop.hbase.io.hfile.BlockCache}. Caches are configured (and instantiated) * by {@link org.apache.hadoop.hbase.io.hfile.CacheConfig}. See head of the * {@link org.apache.hadoop.hbase.io.hfile.CacheConfig} class for constants that define * cache options and configuration keys to use setting cache options. Cache implementations - * include the on-heap {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}, + * include the default, native on-heap {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}, * a {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} that can serve as an L2 for - * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}, and a - * {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} that has a bunch of deploy types - * including L2 for LRUBlockCache or using + * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} (hosted inside the class + * {@link org.apache.hadoop.hbase.io.hfile.DoubleBlockCache} that caches blocks in BOTH L1 and L2, + * and on evict, moves from L1 to L2, etc), and a + * {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} that has a bunch of deploy formats + * including acting as a L2 for LruBlockCache -- when a block is evicted from LruBlockCache, it + * goes to the BucketCache and when we search a block, we look in both places -- or using * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}, as - * host for data blocks with meta blocks in the LRUBlockCache as well as onheap, offheap, and - * file options). + * a host for data blocks with meta blocks in the LRUBlockCache as well as onheap, offheap, and + * file options. * + *

Which BlockCache should I use?

+ * BucketCache has seen more production deploys and has more deploy options. Fetching will always + * be slower when fetching from BucketCache but latencies tend to be less erratic over time + * (roughly because GC is less). SlabCache tends to do more GCs as blocks are moved between L1 + * and L2 always, at least given the way {@link org.apache.hadoop.hbase.io.hfile.DoubleBlockCache} + * currently works. It is tough doing an apples to apples compare since their hosting classes, + * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache} for BucketCache vs + * {@link org.apache.hadoop.hbase.io.hfile.DoubleBlockCache} operate so differently. + * See Nick Dimiduk's + * BlockCache 101 for some numbers. See + * also the description of HBASE-7404 + * where Chunhui Shen lists issues he found with BlockCache (inefficent use of memory, doesn't + * help w/ GC). + * *

Enabling {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache}

- * {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} has seen little use and will likely - * be deprecated in the near future. To enable it, - * set the float hbase.offheapcache.percentage to some value between 0 and 1. This + * {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} is the original offheap block cache + * but unfortunately has seen little use. It is originally described in + * Caching + * in Apache HBase: SlabCache.To enable it, + * set the float hbase.offheapcache.percentage + * ({@link CacheConfig#SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY}) to some value between 0 and 1 in + * your hbase-site.xml file. This * enables {@link org.apache.hadoop.hbase.io.hfile.DoubleBlockCache}, a facade over * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} and - * {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache}. The value set here will be - * multiplied by whatever the setting for -XX:MaxDirectMemorySize is and this is what + * {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache}. DoubleBlockCache works as follows. + * When caching, it + * "...attempts to cache the block in both caches, while readblock reads first from the faster + * onheap cache before looking for the block in the off heap cache. Metrics are the + * combined size and hits and misses of both caches." The value set in + * hbase.offheapcache.percentage will be + * multiplied by whatever the setting for -XX:MaxDirectMemorySize is in + * your hbase-env.sh configuration file and this is what * will be used by {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} as its offheap store. + * Onheap store will be whatever the float {@link HConstants#HFILE_BLOCK_CACHE_SIZE_KEY} setting is + * (some value between 0 and 1) times the size of the allocated java heap. + * + *

Restart (or rolling restart) your cluster for the configs to take effect. Check logs to + * ensure your configurations came out as expected. + * + *

Enabling {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache}

+ * Ensure the SlabCache config hbase.offheapcache.percentage is not set (or set to 0). + * At this point, it is probably best to read the code to learn the list of bucket cache options + * and how they combine (to be fixed). Read the options and defaults for BucketCache in the + * head of the {@link org.apache.hadoop.hbase.io.hfile.CacheConfig}. + * + *

Here is a simple example of how to enable a 4G + * offheap bucket cache with 1G onheap cache. + * The onheap/offheap caches + * are managed by {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache} by default. For the + * CombinedBlockCache (from the class comment), "The smaller lruCache is used + * to cache bloom blocks and index blocks, the larger bucketCache is used to + * cache data blocks. getBlock reads first from the smaller lruCache before + * looking for the block in the bucketCache. Metrics are the combined size and + * hits and misses of both caches." To disable CombinedBlockCache and have the BucketCache act + * as a strict L2 cache to the L1 LruBlockCache (i.e. on eviction from L1, blocks go to L2), set + * {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#BUCKET_CACHE_COMBINED_KEY} to false. + * Also by default, unless you change it, + * {@link CacheConfig#BUCKET_CACHE_COMBINED_PERCENTAGE_KEY} defaults to 0.9 (see + * the top of the CacheConfig in the BucketCache defaults section). This means that whatever + * size you set for the bucket cache with + * {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#BUCKET_CACHE_SIZE_KEY}, + * 90% will be used for offheap and 10% of the size will be used + * by the onheap {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}. + *

Back to the example of setting an onheap cache of 1G and ofheap of 4G, in + * hbase-env.sh ensure the java option -XX:MaxDirectMemorySize is + * enabled and 5G in size: e.g. -XX:MaxDirectMemorySize=5G. Then in + * hbase-site.xml add the following configurations: +

<property>
+  <name>hbase.bucketcache.ioengine</name>
+  <value>offheap</value>
+</property>
+<property>
+  <name>hbase.bucketcache.percentage.in.combinedcache</name>
+  <value>0.8</value>
+</property>
+<property>
+  <name>hbase.bucketcache.size</name>
+  <value>5120</value>
+</property>
. Above we set a cache of 5G, 80% of which will be offheap (4G) and 1G onheap. + * Restart (or rolling restart) your cluster for the configs to take effect. Check logs to ensure + * your configurations came out as expected. + * */ package org.apache.hadoop.hbase.io.hfile; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 9b4621e..8397538 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -50,11 +50,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in * order to determine where a given element fits. Redirects gets and puts to the * correct SingleSizeCache. + * + *

It is configured with a call to {@link #addSlab(int, int)} * **/ @InterfaceAudience.Private public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { - private final ConcurrentHashMap backingStore; private final TreeMap sizer; static final Log LOG = LogFactory.getLog(SlabCache.class); @@ -72,12 +73,25 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { SlabCache.class, false); /** + * Key used reading from configuration list of the percentage of our total space we allocate to + * the slabs. Defaults: "0.80", "0.20". + * @see #SLAB_CACHE_SIZES_KEY Must have corresponding number of elements. + */ + static final String SLAB_CACHE_PROPORTIONS_KEY = "hbase.offheapcache.slab.proportions"; + + /** + * Configuration key for list of the blocksize of the slabs in bytes. (E.g. the slab holds + * blocks of this size). Defaults: avgBlockSize * 11 / 10, avgBlockSize * 21 / 10 + * @see #SLAB_CACHE_PROPORTIONS_KEY + */ + static final String SLAB_CACHE_SIZES_KEY = "hbase.offheapcache.slab.sizes"; + + /** * Default constructor, creates an empty SlabCache. * * @param size Total size allocated to the SlabCache. (Bytes) * @param avgBlockSize Average size of a block being cached. **/ - public SlabCache(long size, long avgBlockSize) { this.avgBlockSize = avgBlockSize; this.size = size; @@ -108,9 +122,8 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { */ public void addSlabByConf(Configuration conf) { // Proportions we allocate to each slab of the total size. - String[] porportions = conf.getStrings( - "hbase.offheapcache.slab.proportions", "0.80", "0.20"); - String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes", + String[] porportions = conf.getStrings(SLAB_CACHE_PROPORTIONS_KEY, "0.80", "0.20"); + String[] sizes = conf.getStrings(SLAB_CACHE_SIZES_KEY, Long.valueOf(avgBlockSize * 11 / 10).toString(), Long.valueOf(avgBlockSize * 21 / 10).toString()); @@ -178,8 +191,8 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { } private void addSlab(int blockSize, int numBlocks) { - LOG.info("Creating a slab of blockSize " + blockSize + " with " + numBlocks - + " blocks, " + StringUtils.humanReadableInt(blockSize * (long) numBlocks) + "bytes."); + LOG.info("Creating slab of blockSize " + blockSize + " with " + numBlocks + + " blocks, " + StringUtils.byteDesc(blockSize * (long) numBlocks) + "bytes."); sizer.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this)); } @@ -325,6 +338,7 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { /* * Statistics thread. Periodically prints the cache statistics to the log. + * TODO: Fix. Just emit to metrics. Don't run a thread just to do a log. */ static class StatisticsThread extends HasThread { SlabCache ourcache; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index 2823415..7e7ba76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Put; @@ -69,7 +70,16 @@ public class TableMapReduceUtil { Class mapper, Class outputKeyClass, Class outputValueClass, JobConf job) { - initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true); + initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, + true, TableInputFormat.class); + } + + public static void initTableMapJob(String table, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job, boolean addDependencyJars) { + initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, TableInputFormat.class); } /** @@ -88,9 +98,10 @@ public class TableMapReduceUtil { public static void initTableMapJob(String table, String columns, Class mapper, Class outputKeyClass, - Class outputValueClass, JobConf job, boolean addDependencyJars) { + Class outputValueClass, JobConf job, boolean addDependencyJars, + Class inputFormat) { - job.setInputFormat(TableInputFormat.class); + job.setInputFormat(inputFormat); job.setMapOutputValueClass(outputValueClass); job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); @@ -114,6 +125,37 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restore directory can be deleted. + * @throws IOException When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapJob(String snapshotName, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job, + boolean addDependencyJars, Path tmpRestoreDir) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); + initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, TableSnapshotInputFormat.class); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); + } + + /** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java new file mode 100644 index 0000000..356c5f7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +public class TableSnapshotInputFormat implements InputFormat { + + static class TableSnapshotRegionSplit implements InputSplit { + private TableSnapshotInputFormatImpl.InputSplit delegate; + + // constructor for mapreduce framework / Writable + public TableSnapshotRegionSplit() { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); + } + + public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { + this.delegate = delegate; + } + + public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, + List locations) { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + } + + @Override + public long getLength() throws IOException { + return delegate.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return delegate.getLocations(); + } + + @Override + public void write(DataOutput out) throws IOException { + delegate.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + delegate.readFields(in); + } + } + + static class TableSnapshotRecordReader + implements RecordReader { + + private TableSnapshotInputFormatImpl.RecordReader delegate; + + public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job) + throws IOException { + delegate = new TableSnapshotInputFormatImpl.RecordReader(); + delegate.initialize(split.delegate, job); + } + + @Override + public boolean next(ImmutableBytesWritable key, Result value) throws IOException { + if (!delegate.nextKeyValue()) { + return false; + } + ImmutableBytesWritable currentKey = delegate.getCurrentKey(); + key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength()); + value.copyFrom(delegate.getCurrentValue()); + return true; + } + + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + @Override + public Result createValue() { + return new Result(); + } + + @Override + public long getPos() throws IOException { + return delegate.getPos(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public float getProgress() throws IOException { + return delegate.getProgress(); + } + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List splits = + TableSnapshotInputFormatImpl.getSplits(job); + InputSplit[] results = new InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); i++) { + results[i] = new TableSnapshotRegionSplit(splits.get(i)); + } + return results; + } + + @Override + public RecordReader + getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); + } + + /** + * Configures the job to use TableSnapshotInputFormat to read from a snapshot. + * @param job the job to configure + * @param snapshotName the name of the snapshot to read from + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @throws IOException if an error occurs + */ + public static void setInput(JobConf job, String snapshotName, Path restoreDir) + throws IOException { + TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 51aaf44..4a3c31f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.mapreduce.hadoopbackport.JarFinder; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -272,6 +273,21 @@ public class TableMapReduceUtil { } /** + * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on + * direct memory will likely cause the map tasks to OOM when opening the region. This + * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user + * wants to override this behavior in their job. + */ + public static void resetCacheConfig(Configuration conf) { + conf.setFloat( + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + conf.setFloat(CacheConfig.SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY, 0f); + conf.setFloat(CacheConfig.BUCKET_CACHE_SIZE_KEY, 0f); + conf.setFloat("hbase.offheapcache.percentage", 0f); + conf.setFloat("hbase.bucketcache.size", 0f); + } + + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. * @@ -300,17 +316,7 @@ public class TableMapReduceUtil { TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); - - /* - * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on - * direct memory will likely cause the map tasks to OOM when opening the region. This - * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user - * wants to override this behavior in their job. - */ - job.getConfiguration().setFloat( - HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); - job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f); - job.getConfiguration().setFloat("hbase.bucketcache.size", 0f); + resetCacheConfig(job.getConfiguration()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index edf3077..f8d4d18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -18,43 +18,24 @@ package org.apache.hadoop.hbase.mapreduce; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -import java.util.UUID; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.ClientSideRegionScanner; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableSnapshotScanner; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; -import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; -import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -99,7 +80,7 @@ import com.google.common.annotations.VisibleForTesting; * To read from snapshot files directly from the file system, the user who is running the MR job * must have sufficient permissions to access snapshot and reference files. * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase - * user or the user must have group or other priviledges in the filesystem (See HBASE-8369). + * user or the user must have group or other privileges in the filesystem (See HBASE-8369). * Note that, given other users access to read from snapshot/data files will completely circumvent * the access control enforced by HBase. * @see TableSnapshotScanner @@ -107,166 +88,94 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Public @InterfaceStability.Evolving public class TableSnapshotInputFormat extends InputFormat { - // TODO: Snapshots files are owned in fs by the hbase user. There is no - // easy way to delegate access. - - private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class); - - /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ - private static final String LOCALITY_CUTOFF_MULTIPLIER = - "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; - private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; - - private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; - private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir"; @VisibleForTesting static class TableSnapshotRegionSplit extends InputSplit implements Writable { - private HTableDescriptor htd; - private HRegionInfo regionInfo; - private String[] locations; + private TableSnapshotInputFormatImpl.InputSplit delegate; // constructor for mapreduce framework / Writable - public TableSnapshotRegionSplit() { } + public TableSnapshotRegionSplit() { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); + } - TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations) { - this.htd = htd; - this.regionInfo = regionInfo; - if (locations == null || locations.isEmpty()) { - this.locations = new String[0]; - } else { - this.locations = locations.toArray(new String[locations.size()]); - } + public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { + this.delegate = delegate; + } + + public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, + List locations) { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); } + @Override public long getLength() throws IOException, InterruptedException { - //TODO: We can obtain the file sizes of the snapshot here. - return 0; + return delegate.getLength(); } @Override public String[] getLocations() throws IOException, InterruptedException { - return locations; + return delegate.getLocations(); } - // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of - // doing this wrapping with Writables. @Override public void write(DataOutput out) throws IOException { - MapReduceProtos.TableSnapshotRegionSplit.Builder builder = - MapReduceProtos.TableSnapshotRegionSplit.newBuilder() - .setTable(htd.convert()) - .setRegion(HRegionInfo.convert(regionInfo)); - - for (String location : locations) { - builder.addLocations(location); - } - - MapReduceProtos.TableSnapshotRegionSplit split = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - split.writeTo(baos); - baos.close(); - byte[] buf = baos.toByteArray(); - out.writeInt(buf.length); - out.write(buf); + delegate.write(out); } + @Override public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - byte[] buf = new byte[len]; - in.readFully(buf); - MapReduceProtos.TableSnapshotRegionSplit split = - MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf); - this.htd = HTableDescriptor.convert(split.getTable()); - this.regionInfo = HRegionInfo.convert(split.getRegion()); - List locationsList = split.getLocationsList(); - this.locations = locationsList.toArray(new String[locationsList.size()]); + delegate.readFields(in); } } @VisibleForTesting static class TableSnapshotRegionRecordReader extends - RecordReader { - private TableSnapshotRegionSplit split; - private Scan scan; - private Result result = null; - private ImmutableBytesWritable row = null; - private ClientSideRegionScanner scanner; + RecordReader { + private TableSnapshotInputFormatImpl.RecordReader delegate = + new TableSnapshotInputFormatImpl.RecordReader(); private TaskAttemptContext context; private Method getCounter; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - - Configuration conf = context.getConfiguration(); - this.split = (TableSnapshotRegionSplit) split; - HTableDescriptor htd = this.split.htd; - HRegionInfo hri = this.split.regionInfo; - FileSystem fs = FSUtils.getCurrentFileSystem(conf); - - Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root - // directory where snapshot was restored - - // create scan - String scanStr = conf.get(TableInputFormat.SCAN); - if (scanStr == null) { - throw new IllegalArgumentException("A Scan is not configured for this job"); - } - scan = TableMapReduceUtil.convertStringToScan(scanStr); - // region is immutable, this should be fine, - // otherwise we have to set the thread read point - scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - // disable caching of data blocks - scan.setCacheBlocks(false); - - scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null); this.context = context; getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); + delegate.initialize( + ((TableSnapshotRegionSplit) split).delegate, + context.getConfiguration()); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { - result = scanner.next(); - if (result == null) { - //we are done - return false; - } - - if (this.row == null) { - this.row = new ImmutableBytesWritable(); - } - this.row.set(result.getRow()); - - ScanMetrics scanMetrics = scanner.getScanMetrics(); - if (scanMetrics != null && context != null) { - TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context); + boolean result = delegate.nextKeyValue(); + if (result) { + ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); + if (scanMetrics != null && context != null) { + TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context); + } } - - return true; + return result; } @Override public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { - return row; + return delegate.getCurrentKey(); } @Override public Result getCurrentValue() throws IOException, InterruptedException { - return result; + return delegate.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { - return 0; // TODO: use total bytes to estimate + return delegate.getProgress(); } @Override public void close() throws IOException { - if (this.scanner != null) { - this.scanner.close(); - } + delegate.close(); } } @@ -278,88 +187,12 @@ public class TableSnapshotInputFormat extends InputFormat getSplits(JobContext job) throws IOException, InterruptedException { - Configuration conf = job.getConfiguration(); - String snapshotName = getSnapshotName(conf); - - Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = rootDir.getFileSystem(conf); - - Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); - SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); - SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); - List regionManifests = manifest.getRegionManifests(); - if (regionManifests == null) { - throw new IllegalArgumentException("Snapshot seems empty"); - } - - // load table descriptor - HTableDescriptor htd = manifest.getTableDescriptor(); - - Scan scan = TableMapReduceUtil.convertStringToScan(conf - .get(TableInputFormat.SCAN)); - Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); - - List splits = new ArrayList(); - for (SnapshotRegionManifest regionManifest : regionManifests) { - // load region descriptor - HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); - - if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); - - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); - splits.add(new TableSnapshotRegionSplit(htd, hri, hosts)); - } + List results = new ArrayList(); + for (TableSnapshotInputFormatImpl.InputSplit split : + TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { + results.add(new TableSnapshotRegionSplit(split)); } - - return splits; - } - - /** - * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take - * weights into account, thus will treat every location passed from the input split as equal. We - * do not want to blindly pass all the locations, since we are creating one split per region, and - * the region's blocks are all distributed throughout the cluster unless favorite node assignment - * is used. On the expected stable case, only one location will contain most of the blocks as - * local. - * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here - * we are doing a simple heuristic, where we will pass all hosts which have at least 80% - * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top - * host with the best locality. - */ - @VisibleForTesting - List getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) { - List locations = new ArrayList(3); - - HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); - - if (hostAndWeights.length == 0) { - return locations; - } - - HostAndWeight topHost = hostAndWeights[0]; - locations.add(topHost.getHost()); - - // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality - double cutoffMultiplier - = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); - - double filterWeight = topHost.getWeight() * cutoffMultiplier; - - for (int i = 1; i < hostAndWeights.length; i++) { - if (hostAndWeights[i].getWeight() >= filterWeight) { - locations.add(hostAndWeights[i].getHost()); - } else { - break; - } - } - - return locations; + return results; } /** @@ -371,26 +204,8 @@ public class TableSnapshotInputFormat extends InputFormat locations) { + this.htd = htd; + this.regionInfo = regionInfo; + if (locations == null || locations.isEmpty()) { + this.locations = new String[0]; + } else { + this.locations = locations.toArray(new String[locations.size()]); + } + } + + public long getLength() { + //TODO: We can obtain the file sizes of the snapshot here. + return 0; + } + + public String[] getLocations() { + return locations; + } + + public HTableDescriptor getTableDescriptor() { + return htd; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of + // doing this wrapping with Writables. + @Override + public void write(DataOutput out) throws IOException { + TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() + .setTable(htd.convert()) + .setRegion(HRegionInfo.convert(regionInfo)); + + for (String location : locations) { + builder.addLocations(location); + } + + TableSnapshotRegionSplit split = builder.build(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + split.writeTo(baos); + baos.close(); + byte[] buf = baos.toByteArray(); + out.writeInt(buf.length); + out.write(buf); + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + byte[] buf = new byte[len]; + in.readFully(buf); + TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf); + this.htd = HTableDescriptor.convert(split.getTable()); + this.regionInfo = HRegionInfo.convert(split.getRegion()); + List locationsList = split.getLocationsList(); + this.locations = locationsList.toArray(new String[locationsList.size()]); + } + } + + /** + * Implementation class for RecordReader logic common between mapred and mapreduce. + */ + public static class RecordReader { + private InputSplit split; + private Scan scan; + private Result result = null; + private ImmutableBytesWritable row = null; + private ClientSideRegionScanner scanner; + + public ClientSideRegionScanner getScanner() { + return scanner; + } + + public void initialize(InputSplit split, Configuration conf) throws IOException { + this.split = split; + HTableDescriptor htd = split.htd; + HRegionInfo hri = this.split.getRegionInfo(); + FileSystem fs = FSUtils.getCurrentFileSystem(conf); + + Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root + // directory where snapshot was restored + + // create scan + // TODO: mapred does not support scan as input API. Work around for now. + if (conf.get(TableInputFormat.SCAN) != null) { + scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { + String[] columns = + conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); + scan = new Scan(); + for (String col : columns) { + scan.addFamily(Bytes.toBytes(col)); + } + } else { + throw new IllegalArgumentException("A Scan is not configured for this job"); + } + + // region is immutable, this should be fine, + // otherwise we have to set the thread read point + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + // disable caching of data blocks + scan.setCacheBlocks(false); + + scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null); + } + + public boolean nextKeyValue() throws IOException { + result = scanner.next(); + if (result == null) { + //we are done + return false; + } + + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + return true; + } + + public ImmutableBytesWritable getCurrentKey() { + return row; + } + + public Result getCurrentValue() { + return result; + } + + public long getPos() { + return 0; + } + + public float getProgress() { + return 0; // TODO: use total bytes to estimate + } + + public void close() { + if (this.scanner != null) { + this.scanner.close(); + } + } + } + + public static List getSplits(Configuration conf) throws IOException { + String snapshotName = getSnapshotName(conf); + + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + List regionManifests = manifest.getRegionManifests(); + if (regionManifests == null) { + throw new IllegalArgumentException("Snapshot seems empty"); + } + + // load table descriptor + HTableDescriptor htd = manifest.getTableDescriptor(); + + // TODO: mapred does not support scan as input API. Work around for now. + Scan scan = null; + if (conf.get(TableInputFormat.SCAN) != null) { + scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { + String[] columns = + conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); + scan = new Scan(); + for (String col : columns) { + scan.addFamily(Bytes.toBytes(col)); + } + } else { + throw new IllegalArgumentException("Unable to create scan"); + } + Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); + + List splits = new ArrayList(); + for (SnapshotRegionManifest regionManifest : regionManifests) { + // load region descriptor + HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); + + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey())) { + // compute HDFS locations from snapshot files (which will get the locations for + // referred hfiles) + List hosts = getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + + int len = Math.min(3, hosts.size()); + hosts = hosts.subList(0, len); + splits.add(new InputSplit(htd, hri, hosts)); + } + } + + return splits; + } + + /** + * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take + * weights into account, thus will treat every location passed from the input split as equal. We + * do not want to blindly pass all the locations, since we are creating one split per region, and + * the region's blocks are all distributed throughout the cluster unless favorite node assignment + * is used. On the expected stable case, only one location will contain most of the blocks as + * local. + * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here + * we are doing a simple heuristic, where we will pass all hosts which have at least 80% + * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top + * host with the best locality. + */ + public static List getBestLocations( + Configuration conf, HDFSBlocksDistribution blockDistribution) { + List locations = new ArrayList(3); + + HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); + + if (hostAndWeights.length == 0) { + return locations; + } + + HostAndWeight topHost = hostAndWeights[0]; + locations.add(topHost.getHost()); + + // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality + double cutoffMultiplier + = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); + + double filterWeight = topHost.getWeight() * cutoffMultiplier; + + for (int i = 1; i < hostAndWeights.length; i++) { + if (hostAndWeights[i].getWeight() >= filterWeight) { + locations.add(hostAndWeights[i].getHost()); + } else { + break; + } + } + + return locations; + } + + private static String getSnapshotName(Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } + + /** + * Configures the job to use TableSnapshotInputFormat to read from a snapshot. + * @param conf the job to configuration + * @param snapshotName the name of the snapshot to read from + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @throws IOException if an error occurs + */ + public static void setInput(Configuration conf, String snapshotName, Path restoreDir) + throws IOException { + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); + + // TODO: restore from record readers to parallelize. + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + + conf.set(TABLE_DIR_KEY, restoreDir.toString()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 20110f2..ca6f444 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -337,8 +337,7 @@ public interface HLog { * able to sync an explicit edit only (the current default implementation syncs up to the time * of the sync call syncing whatever is behind the sync). * @throws IOException - * @deprecated Use - * {@link #appendNoSync(HRegionInfo, HLogKey, WALEdit, HTableDescriptor, AtomicLong, boolean)} + * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)} * instead because you can get back the region edit/sequenceid; it is set into the passed in * key. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index b772223..9e92d5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -131,7 +131,7 @@ import com.google.protobuf.Service; * *

* To perform authorization checks, {@code AccessController} relies on the - * {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} being loaded to provide + * RpcServerEngine being loaded to provide * the user identities for remote requests. *

* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java index 63cdbc4..df3ddac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java @@ -20,13 +20,17 @@ package org.apache.hadoop.hbase.snapshot; import java.io.IOException; import java.io.FileNotFoundException; +import java.net.URI; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -233,6 +237,8 @@ public final class SnapshotInfo extends Configured implements Tool { @Override public int run(String[] args) throws IOException, InterruptedException { + final Configuration conf = getConf(); + boolean listSnapshots = false; String snapshotName = null; boolean showSchema = false; boolean showFiles = false; @@ -251,6 +257,13 @@ public final class SnapshotInfo extends Configured implements Tool { showStats = true; } else if (cmd.equals("-schema")) { showSchema = true; + } else if (cmd.equals("-remote-dir")) { + Path sourceDir = new Path(args[++i]); + URI defaultFs = sourceDir.getFileSystem(conf).getUri(); + FSUtils.setFsDefault(conf, new Path(defaultFs)); + FSUtils.setRootDir(conf, sourceDir); + } else if (cmd.equals("-list-snapshots")) { + listSnapshots = true; } else if (cmd.equals("-h") || cmd.equals("--help")) { printUsageAndExit(); } else { @@ -262,15 +275,28 @@ public final class SnapshotInfo extends Configured implements Tool { } } + // List Available Snapshots + if (listSnapshots) { + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + System.out.printf("%-20s | %-20s | %s%n", "SNAPSHOT", "CREATION TIME", "TABLE NAME"); + for (SnapshotDescription desc: getSnapshotList(conf)) { + System.out.printf("%-20s | %20s | %s%n", + desc.getName(), + df.format(new Date(desc.getCreationTime())), + desc.getTable()); + } + return 0; + } + if (snapshotName == null) { System.err.println("Missing snapshot name!"); printUsageAndExit(); return 1; } - Configuration conf = getConf(); - fs = FileSystem.get(conf); rootDir = FSUtils.getRootDir(conf); + fs = FileSystem.get(rootDir.toUri(), conf); + LOG.debug("fs=" + fs.getUri().toString() + " root=" + rootDir); // Load snapshot information if (!loadSnapshotInfo(snapshotName)) { @@ -398,6 +424,8 @@ public final class SnapshotInfo extends Configured implements Tool { System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName()); System.err.println(" where [options] are:"); System.err.println(" -h|-help Show this help and exit."); + System.err.println(" -remote-dir Root directory that contains the snapshots."); + System.err.println(" -list-snapshots List all the available snapshots and exit."); System.err.println(" -snapshot NAME Snapshot to examine."); System.err.println(" -files Files and logs list."); System.err.println(" -stats Files and logs stats."); @@ -418,7 +446,7 @@ public final class SnapshotInfo extends Configured implements Tool { public static SnapshotStats getSnapshotStats(final Configuration conf, final SnapshotDescription snapshot) throws IOException { Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(rootDir.toUri(), conf); Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); final SnapshotStats stats = new SnapshotStats(conf, fs, snapshot); SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshot, @@ -440,6 +468,26 @@ public final class SnapshotInfo extends Configured implements Tool { } /** + * Returns the list of available snapshots in the specified location + * @param conf the {@link Configuration} to use + * @return the list of snapshots + */ + public static List getSnapshotList(final Configuration conf) + throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = FileSystem.get(rootDir.toUri(), conf); + Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); + FileStatus[] snapshots = fs.listStatus(snapshotDir, + new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); + List snapshotLists = + new ArrayList(snapshots.length); + for (FileStatus snapshotDirStat: snapshots) { + snapshotLists.add(SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDirStat.getPath())); + } + return snapshotLists; + } + + /** * The guts of the {@link #main} method. * Call this method to avoid the {@link #main(String[])} System.exit. * @param args diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java index 466a498..f75a5f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java @@ -26,6 +26,11 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.List; +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -33,11 +38,6 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Preconditions; -import javax.management.JMException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - /** * Utilities for interacting with and monitoring DirectByteBuffer allocations. */ @@ -71,7 +71,7 @@ public class DirectMemoryUtils { try { a = BEAN_SERVER.getAttribute(NIO_DIRECT_POOL, MEMORY_USED); } catch (JMException e) { - LOG.debug("Failed to retrieve nio.BufferPool direct MemoryUsed attribute.", e); + LOG.debug("Failed to retrieve nio.BufferPool direct MemoryUsed attribute: " + e); } } HAS_MEMORY_USED_ATTRIBUTE = a != null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 2001494..e9e2310 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -1146,6 +1146,8 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); System.err.println(" table Alternate table name. Default: 'TestTable'"); + System.err.println(" multiGet If >0, when doing RandomRead, perform multiple gets " + + "instead of single gets. Default: 0"); System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); System.err.println(" flushCommits Used to determine if the test should flush the table. " + "Default: false"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java new file mode 100644 index 0000000..919fa9b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -0,0 +1,176 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; + +/** + * Tests that {@link CacheConfig} does as expected. + */ +@Category(SmallTests.class) +public class TestCacheConfig { + private Configuration conf; + + static class DataCacheEntry implements Cacheable { + private static final int SIZE = 1; + private static DataCacheEntry SINGLETON = new DataCacheEntry(); + + private final CacheableDeserializer deserializer = + new CacheableDeserializer() { + @Override + public int getDeserialiserIdentifier() { + return 0; + } + + @Override + public Cacheable deserialize(ByteBuffer b, boolean reuse) throws IOException { + Log.info("Deserialized " + b + ", reuse=" + reuse); + return SINGLETON; + } + + @Override + public Cacheable deserialize(ByteBuffer b) throws IOException { + Log.info("Deserialized " + b); + return SINGLETON; + } + }; + + public String toString() { + return "size=" + SIZE + ", type=" + getBlockType(); + }; + + @Override + public long heapSize() { + return SIZE; + } + + @Override + public int getSerializedLength() { + return SIZE; + } + + @Override + public void serialize(ByteBuffer destination) { + Log.info("Serialized " + this + " to " + destination); + } + + @Override + public CacheableDeserializer getDeserializer() { + return this.deserializer; + } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } + }; + + static class MetaCacheEntry extends DataCacheEntry { + @Override + public BlockType getBlockType() { + return BlockType.INTERMEDIATE_INDEX; + } + } + + @Before + public void setUp() throws Exception { + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + this.conf = HBaseConfiguration.create(); + } + + @After + public void tearDown() throws Exception { + // Let go of current block cache. + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + } + + /** + * @param cc + * @param doubling If true, addition of element ups counter by 2, not 1, because element added + * to onheap and offheap caches. + * @param sizing True if we should run sizing test (doesn't always apply). + */ + private void basicBlockCacheOps(final CacheConfig cc, final boolean doubling, + final boolean sizing) { + assertTrue(cc.isBlockCacheEnabled()); + assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory()); + BlockCache bc = cc.getBlockCache(); + BlockCacheKey bck = new BlockCacheKey("f", 0); + Cacheable c = new DataCacheEntry(); + // Do asserts on block counting. + long initialBlockCount = bc.getBlockCount(); + bc.cacheBlock(bck, c); + assertEquals(doubling? 2: 1, bc.getBlockCount() - initialBlockCount); + bc.evictBlock(bck); + assertEquals(initialBlockCount, bc.getBlockCount()); + // Do size accounting. Do it after the above 'warm-up' because it looks like some + // buffers do lazy allocation so sizes are off on first go around. + if (sizing) { + long originalSize = bc.getCurrentSize(); + bc.cacheBlock(bck, c); + long size = bc.getCurrentSize(); + assertTrue(bc.getCurrentSize() > originalSize); + bc.evictBlock(bck); + size = bc.getCurrentSize(); + assertEquals(originalSize, size); + } + } + + @Test + public void testCacheConfigDefaultLRUBlockCache() { + CacheConfig cc = new CacheConfig(this.conf); + assertTrue(cc.isBlockCacheEnabled()); + assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory()); + basicBlockCacheOps(cc, false, true); + assertTrue(cc.getBlockCache() instanceof LruBlockCache); + } + + @Test + public void testSlabCacheConfig() { + this.conf.setFloat(CacheConfig.SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY, 0.1f); + CacheConfig cc = new CacheConfig(this.conf); + basicBlockCacheOps(cc, true, true); + assertTrue(cc.getBlockCache() instanceof DoubleBlockCache); + // TODO Assert sizes allocated are right. + } + + @Test + public void testBucketCacheConfig() { + this.conf.set(CacheConfig.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + this.conf.setInt(CacheConfig.BUCKET_CACHE_SIZE_KEY, 100); + this.conf.setFloat(CacheConfig.BUCKET_CACHE_COMBINED_PERCENTAGE_KEY, 0.8f); + CacheConfig cc = new CacheConfig(this.conf); + basicBlockCacheOps(cc, false, false); + assertTrue(cc.getBlockCache() instanceof CombinedBlockCache); + // TODO: Assert sizes allocated are right and proportions. + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java new file mode 100644 index 0000000..fb13d70 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Iterator; + +@Category(LargeTests.class) +public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { + + private static final byte[] aaa = Bytes.toBytes("aaa"); + private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{' + private static final String COLUMNS = + Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]); + + @Override + protected byte[] getStartRow() { + return aaa; + } + + @Override + protected byte[] getEndRow() { + return after_zzz; + } + + static class TestTableSnapshotMapper extends MapReduceBase + implements TableMap { + @Override + public void map(ImmutableBytesWritable key, Result value, + OutputCollector collector, Reporter reporter) + throws IOException { + verifyRowFromMap(key, value); + collector.collect(key, NullWritable.get()); + } + } + + public static class TestTableSnapshotReducer extends MapReduceBase + implements Reducer { + HBaseTestingUtility.SeenRowTracker rowTracker = + new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz); + + @Override + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector collector, Reporter reporter) + throws IOException { + rowTracker.addRow(key.get()); + } + + @Override + public void close() { + rowTracker.validate(); + } + } + + @Test + public void testInitTableSnapshotMapperJobConfig() throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig"); + String snapshotName = "foo"; + + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); + JobConf job = new JobConf(UTIL.getConfiguration()); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + // TODO: would be better to examine directly the cache instance that results from this + // config. Currently this is not possible because BlockCache initialization is static. + Assert.assertEquals( + "Snapshot job should be configured for default LruBlockCache.", + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, + job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); + Assert.assertEquals( + "Snapshot job should not use SlabCache.", + 0, job.getFloat("hbase.offheapcache.percentage", -1), 0.01); + Assert.assertEquals( + "Snapshot job should not use BucketCache.", + 0, job.getFloat("hbase.bucketcache.size", -1), 0.01); + } finally { + UTIL.getHBaseAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + // TODO: mapred does not support limiting input range by startrow, endrow. + // Thus the following tests must override parameterverification. + + @Test + @Override + public void testWithMockedMapReduceMultiRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10); + } + + @Test + @Override + public void testWithMapReduceMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false); + } + + @Test + @Override + // run the MR job while HBase is offline + public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true); + } + + @Override + protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits) throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testWithMockedMapReduce"); + try { + createTableAndSnapshot( + util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); + + JobConf job = new JobConf(util.getConfiguration()); + Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); + + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + // mapred doesn't support start and end keys? o.O + verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); + + } finally { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + tearDownCluster(); + } + } + + private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, + byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + InputSplit[] splits = tsif.getSplits(job, 0); + + Assert.assertEquals(expectedNumSplits, splits.length); + + HBaseTestingUtility.SeenRowTracker rowTracker = + new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + + for (int i = 0; i < splits.length; i++) { + // validate input split + InputSplit split = splits[i]; + Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); + + // validate record reader + OutputCollector collector = mock(OutputCollector.class); + Reporter reporter = mock(Reporter.class); + RecordReader rr = tsif.getRecordReader(split, job, reporter); + + // validate we can read all the data back + ImmutableBytesWritable key = rr.createKey(); + Result value = rr.createValue(); + while (rr.next(key, value)) { + verifyRowFromMap(key, value); + rowTracker.addRow(key.copyBytes()); + } + + rr.close(); + } + + // validate all rows are seen + rowTracker.validate(); + } + + @Override + protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + boolean shutdownCluster) throws Exception { + doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, + numRegions, expectedNumSplits, shutdownCluster); + } + + // this is also called by the IntegrationTestTableSnapshotInputFormat + public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, + int expectedNumSplits, boolean shutdownCluster) throws Exception { + + //create the table and snapshot + createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); + + if (shutdownCluster) { + util.shutdownMiniHBaseCluster(); + } + + try { + // create the job + JobConf jobConf = new JobConf(util.getConfiguration()); + + jobConf.setJarByClass(util.getClass()); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf, + TestTableSnapshotInputFormat.class); + + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, jobConf, true, tableDir); + + jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); + jobConf.setNumReduceTasks(1); + jobConf.setOutputFormat(NullOutputFormat.class); + + RunningJob job = JobClient.runJob(jobConf); + Assert.assertTrue(job.isSuccessful()); + } finally { + if (!shutdownCluster) { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java new file mode 100644 index 0000000..e82b357 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public abstract class TableSnapshotInputFormatTestBase { + + protected final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static final int NUM_REGION_SERVERS = 2; + protected static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; + + protected FileSystem fs; + protected Path rootDir; + + public void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_REGION_SERVERS); + rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + fs = rootDir.getFileSystem(UTIL.getConfiguration()); + } + + public void tearDownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private static void setupConf(Configuration conf) { + // Enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits) throws Exception; + + protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + boolean shutdownCluster) throws Exception; + + protected abstract byte[] getStartRow(); + + protected abstract byte[] getEndRow(); + + @Test + public void testWithMockedMapReduceSingleRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); + } + + @Test + public void testWithMockedMapReduceMultiRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); + } + + @Test + public void testWithMapReduceSingleRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); + } + + @Test + public void testWithMapReduceMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); + } + + @Test + // run the MR job while HBase is offline + public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); + } + + protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { + setupCluster(); + util.startMiniMapReduceCluster(); + try { + Path tableDir = util.getDataTestDirOnTestFS(snapshotName); + TableName tableName = TableName.valueOf("testWithMapReduce"); + testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions, + expectedNumSplits, shutdownCluster); + } finally { + util.shutdownMiniMapReduceCluster(); + tearDownCluster(); + } + } + + protected static void verifyRowFromMap(ImmutableBytesWritable key, Result result) + throws IOException { + byte[] row = key.get(); + CellScanner scanner = result.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + + //assert that all Cells in the Result have the same key + Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + + for (int j = 0; j < FAMILIES.length; j++) { + byte[] actual = result.getValue(FAMILIES[j], null); + Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + + " ,actual:" + Bytes.toString(actual), row, actual); + } + } + + protected static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, int numRegions) + throws Exception { + try { + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + HBaseAdmin admin = util.getHBaseAdmin(); + + // put some stuff in the table + HTable table = new HTable(util.getConfiguration(), tableName); + util.loadTable(table, FAMILIES); + + Path rootDir = FSUtils.getRootDir(util.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); + + // load different values + byte[] value = Bytes.toBytes("after_snapshot_value"); + util.loadTable(table, FAMILIES, value); + + // cause flush to create new files in the region + admin.flush(tableName.toString()); + table.close(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 9f2d390..9e7102d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -22,33 +22,20 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.Arrays; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; -import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; -import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -64,34 +51,19 @@ import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; @Category(LargeTests.class) -public class TestTableSnapshotInputFormat { - - private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class); - private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static final int NUM_REGION_SERVERS = 2; - private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat"; - private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; - private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR); - public static byte[] bbb = Bytes.toBytes("bbb"); - public static byte[] yyy = Bytes.toBytes("yyy"); - - private FileSystem fs; - private Path rootDir; - - public void setupCluster() throws Exception { - setupConf(UTIL.getConfiguration()); - UTIL.startMiniCluster(NUM_REGION_SERVERS); - rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); - fs = rootDir.getFileSystem(UTIL.getConfiguration()); - } +public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { + + private static final byte[] bbb = Bytes.toBytes("bbb"); + private static final byte[] yyy = Bytes.toBytes("yyy"); - public void tearDownCluster() throws Exception { - UTIL.shutdownMiniCluster(); + @Override + protected byte[] getStartRow() { + return bbb; } - private static void setupConf(Configuration conf) { - // Enable snapshot - conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + @Override + protected byte[] getEndRow() { + return yyy; } @After @@ -100,7 +72,7 @@ public class TestTableSnapshotInputFormat { @Test public void testGetBestLocations() throws IOException { - TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); Configuration conf = UTIL.getConfiguration(); HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); @@ -169,41 +141,6 @@ public class TestTableSnapshotInputFormat { } } - public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, - String snapshotName, int numRegions) - throws Exception { - try { - util.deleteTable(tableName); - } catch(Exception ex) { - // ignore - } - - if (numRegions > 1) { - util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); - } else { - util.createTable(tableName, FAMILIES); - } - HBaseAdmin admin = util.getHBaseAdmin(); - - // put some stuff in the table - HTable table = new HTable(util.getConfiguration(), tableName); - util.loadTable(table, FAMILIES); - - Path rootDir = FSUtils.getRootDir(util.getConfiguration()); - FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); - - SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, - Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); - - // load different values - byte[] value = Bytes.toBytes("after_snapshot_value"); - util.loadTable(table, FAMILIES, value); - - // cause flush to create new files in the region - admin.flush(tableName.toString()); - table.close(); - } - @Test public void testInitTableSnapshotMapperJobConfig() throws Exception { setupCluster(); @@ -211,7 +148,7 @@ public class TestTableSnapshotInputFormat { String snapshotName = "foo"; try { - createTableAndSnapshot(UTIL, tableName, snapshotName, 1); + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); Job job = new Job(UTIL.getConfiguration()); Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); @@ -238,32 +175,23 @@ public class TestTableSnapshotInputFormat { } } - @Test - public void testWithMockedMapReduceSingleRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); - } - - @Test - public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); - } - public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits) throws Exception { setupCluster(); TableName tableName = TableName.valueOf("testWithMockedMapReduce"); try { - createTableAndSnapshot(util, tableName, snapshotName, numRegions); + createTableAndSnapshot( + util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); Job job = new Job(util.getConfiguration()); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); - Scan scan = new Scan(bbb, yyy); // limit the scan + Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, tmpTableDir); - verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy); + verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); } finally { util.getHBaseAdmin().deleteSnapshot(snapshotName); @@ -309,63 +237,21 @@ public class TestTableSnapshotInputFormat { rowTracker.validate(); } - public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) - throws IOException { - byte[] row = key.get(); - CellScanner scanner = result.cellScanner(); - while (scanner.advance()) { - Cell cell = scanner.current(); - - //assert that all Cells in the Result have the same key - Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, - cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); - } - - for (int j = 0; j < FAMILIES.length; j++) { - byte[] actual = result.getValue(FAMILIES[j], null); - Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) - + " ,actual:" + Bytes.toString(actual), row, actual); - } - } - - @Test - public void testWithMapReduceSingleRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); - } - - @Test - public void testWithMapReduceMultiRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); - } - - @Test - // run the MR job while HBase is offline - public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); - } - - private void testWithMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { - setupCluster(); - util.startMiniMapReduceCluster(); - try { - Path tableDir = util.getDataTestDirOnTestFS(snapshotName); - TableName tableName = TableName.valueOf("testWithMapReduce"); - doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions, - expectedNumSplits, shutdownCluster); - } finally { - util.shutdownMiniMapReduceCluster(); - tearDownCluster(); - } + @Override + protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + boolean shutdownCluster) throws Exception { + doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, + numRegions, expectedNumSplits, shutdownCluster); } // this is also called by the IntegrationTestTableSnapshotInputFormat public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, - String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, - boolean shutdownCluster) throws Exception { + String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, + int expectedNumSplits, boolean shutdownCluster) throws Exception { //create the table and snapshot - createTableAndSnapshot(util, tableName, snapshotName, numRegions); + createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); if (shutdownCluster) { util.shutdownMiniHBaseCluster(); @@ -374,7 +260,7 @@ public class TestTableSnapshotInputFormat { try { // create the job Job job = new Job(util.getConfiguration()); - Scan scan = new Scan(bbb, yyy); // limit the scan + Scan scan = new Scan(startRow, endRow); // limit the scan job.setJarByClass(util.getClass()); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), diff --git a/pom.xml b/pom.xml index e717cca..52a67ec 100644 --- a/pom.xml +++ b/pom.xml @@ -525,7 +525,8 @@ using method parallelization class ! --> ${surefire.testFailureIgnore} ${surefire.timeout} - -enableassertions -Xmx1900m -XX:MaxPermSize=100m -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true + + -enableassertions -XX:MaxDirectMemorySize=1G -Xmx1900m -XX:MaxPermSize=100m -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true ${test.output.tofile} diff --git a/src/main/docbkx/book.xml b/src/main/docbkx/book.xml index 023bd2b..0fb3eba 100644 --- a/src/main/docbkx/book.xml +++ b/src/main/docbkx/book.xml @@ -1590,7 +1590,9 @@ rs.close(); Block Cache Below we describe the default block cache implementation, the LRUBlockCache. Read for an understanding of how it works and an overview of the facility it provides. - Other, off-heap options have since been added. After reading the below, + Other, off-heap options have since been added. These are described in the + javadoc org.apache.hadoop.hbase.io.hfile package description. + After reading the below, be sure to visit the blog series BlockCache 101 by Nick Dimiduk where other Block Cache implementations are described. @@ -1670,6 +1672,12 @@ rs.close(); +
Offheap Block Cache + There are a few options for configuring an off-heap cache for blocks read from HDFS. + The options and their setup are described in a javadoc package doc. See + org.apache.hadoop.hbase.io.hfile package description. + +
diff --git a/src/main/xslt/configuration_to_docbook_section.xsl b/src/main/xslt/configuration_to_docbook_section.xsl index 95f7fd5..57219ab 100644 --- a/src/main/xslt/configuration_to_docbook_section.xsl +++ b/src/main/xslt/configuration_to_docbook_section.xsl @@ -25,7 +25,7 @@ This stylesheet is used making an html version of hbase-default.xml. --> -
HBase Default Configuration - - - -HBase Default Configuration The documentation below is generated using the default hbase configuration file, hbase-default.xml, as source. @@ -47,7 +43,7 @@ The documentation below is generated using the default hbase configuration file, - + @@ -55,7 +51,10 @@ The documentation below is generated using the default hbase configuration file, - Default: + + Default + + @@ -63,6 +62,5 @@ The documentation below is generated using the default hbase configuration file, -