Index: src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java (revision 1531202)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java (working copy)
@@ -67,8 +67,8 @@
Path path = new Path("/Users/ryan/rfile.big.txt");
long start = System.currentTimeMillis();
SimpleBlockCache cache = new SimpleBlockCache();
- CacheConfig cacheConf = new CacheConfig(cache, true, false, false, false,
- false, false, false);
+ CacheConfig cacheConf = new CacheConfig.CacheConfigBuilder().withBlockCache(cache)
+ .withCacheDataOnRead(true).build();
Reader reader = HFile.createReader(lfs, path, cacheConf);
reader.loadFileInfo();
Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (revision 1531202)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (working copy)
@@ -152,8 +152,7 @@
}
missCount += 1;
- prevBlock = realReader.readBlockData(offset, onDiskSize,
- -1, pread);
+ prevBlock = realReader.readBlockData(offset, onDiskSize, -1, pread, false);
prevOffset = offset;
prevOnDiskSize = onDiskSize;
prevPread = pread;
@@ -167,8 +166,8 @@
LOG.info("Size of " + path + ": " + fileSize);
FSDataInputStream istream = fs.open(path);
- HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream,
- compr, fs.getFileStatus(path).getLen());
+ HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream, compr, fs.getFileStatus(
+ path).getLen());
BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
HFileBlockIndex.BlockIndexReader indexReader =
@@ -221,8 +220,8 @@
HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
FSDataOutputStream outputStream = fs.create(path);
- HFileBlockIndex.BlockIndexWriter biw =
- new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
+ HFileBlockIndex.BlockIndexWriter biw = new HFileBlockIndex.BlockIndexWriter(hbw, null, null,
+ null);
for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
hbw.startWriting(BlockType.DATA).write(
Index: src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (working copy)
@@ -0,0 +1,276 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestBucketCache {
+
+ private static final Log LOG = LogFactory.getLog(TestBucketCache.class);
+
+ private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
+
+ private static final int CACHE_SIZE = 1000000;
+ private static final int NUM_BLOCKS = 100;
+ private static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
+ private static final int NUM_THREADS = 1000;
+ private static final int NUM_QUERIES = 10000;
+
+ private final String ioEngineName;
+
+ private BucketCache cache;
+
+
+
+ public TestBucketCache(String ioEngineName) {
+ this.ioEngineName = ioEngineName;
+ LOG.info("Running with ioEngineName = " + ioEngineName);
+ }
+
+ @Parameterized.Parameters
+ public static Collection
+ * TODO (avf): remove all singleton functionality once Guice or other DI
+ * mechanism is available
+ */
+public class LruBlockCacheFactory implements BlockCacheFactory {
+
+ private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
+
+ /** Cached singleton instance or null if not initialized */
+ private static LruBlockCacheFactory instance;
+
+ private LruBlockCacheFactory() { } // Private as this is a singleton
+
+ public synchronized static LruBlockCacheFactory getInstance() {
+ if (instance == null) {
+ instance = new LruBlockCacheFactory();
+ }
+ return instance;
+ }
+
+ /** Cached instance of the BlockCache or null if not initialized */
+ private LruBlockCache blockCache;
+
+ // ALlows to short circuit getBlockCache calls if the cache is disabled
+ private boolean blockCacheDisabled;
+
+ /**
+ * Returns the current underlying block cache instance or null if
+ * block cache is disabled or not initialized. Used by
+ * {@link SchemaMetrics}
+ * @return The current block cache instance, null if disabled or
+ * or not initialized
+ */
+ public synchronized LruBlockCache getCurrentBlockCacheInstance() {
+ return blockCache;
+ }
+
+ /**
+ * Returns a cached initialized instance of {@link LruBlockCache}.
+ * Follows lazy initialization pattern: first invocation creates and
+ * initializes the instance, subsequent invocations return the cached
+ * instance. This replaced a static "instantiateBlockCache()" method
+ * in CacheConfig.
+ * * @param conf The HBase configuration needed to create the instance
+ * @return The {@link LruBlockCache} instance.
+ */
+ @Override
+ public synchronized BlockCache getBlockCache(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+
+ if (blockCache != null)
+ return blockCache;
+ if (blockCacheDisabled)
+ return null;
+
+ float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ if (cachePercentage == 0L) {
+ blockCacheDisabled = true;
+ return null;
+ }
+ if (cachePercentage > 1.0) {
+ throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY +
+ " must be between 0.0 and 1.0, not > 1.0");
+ }
+
+ // Calculate the amount of heap to give the heap.
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ long cacheSize = (long)(mu.getMax() * cachePercentage);
+ LOG.info("Allocating LruBlockCache with maximum size " +
+ StringUtils.humanReadableInt(cacheSize));
+ blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+ return blockCache;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (working copy)
@@ -171,4 +171,7 @@
return onHeapCache.getBlockCount() + offHeapCache.getBlockCount();
}
+ @Override
+ public void clearCache() {
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (working copy)
@@ -37,7 +37,7 @@
ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) +
ClassSize.STRING + ClassSize.BYTE_BUFFER);
- static enum BlockPriority {
+ public static enum BlockPriority {
/**
* Accessed a single time (used for scan-resistance)
*/
@@ -50,7 +50,7 @@
* Block from in-memory store
*/
MEMORY
- };
+ }
private final BlockCacheKey cacheKey;
private final Cacheable buf;
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (working copy)
@@ -95,4 +95,8 @@
public DataBlockEncoding getDataBlockEncoding() {
return encoding;
}
+
+ public long getOffset() {
+ return offset;
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (working copy)
@@ -134,5 +134,8 @@
return 0;
}
+ @Override
+ public void clearCache() {
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy)
@@ -17,18 +17,16 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryUsage;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.hfile.bucket.IOEngine;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hbase.util.Strings;
/**
* Stores all of the cache objects and configuration for a single HFile.
@@ -70,6 +68,86 @@
public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
"hbase.rs.evictblocksonclose";
+ /**
+ * IO engine to be used by the L2 cache. Current accepted settings:
+ * "heap", and "offheap". If omitted, L2 cache will be disabled.
+ * @see IOEngine
+ */
+ public static final String L2_BUCKET_CACHE_IOENGINE_KEY =
+ "hfile.l2.bucketcache.ioengine";
+
+ /**
+ * Size of the L2 cache. If < 1.0, this is taken as a percentage of available
+ * (depending on configuration) direct or heap memory, otherwise it is taken
+ * as an absolute size in kb. If omitted or set to 0, L2 cache will be
+ * disabled.
+ */
+ public static final String L2_BUCKET_CACHE_SIZE_KEY =
+ "hfile.l2.bucketcache.size";
+
+ /**
+ * Number of writer threads for the BucketCache instance used for the L2
+ * cache.
+ * @see L2BucketCache
+ */
+ public static final String L2_BUCKET_CACHE_WRITER_THREADS_KEY =
+ "hfile.l2.bucketcache.writer.threads";
+
+ /**
+ * Maximum length of the write queue for the BucketCache instance used for
+ * the L2 cache.
+ * @see L2BucketCache
+ */
+ public static final String L2_BUCKET_CACHE_QUEUE_KEY =
+ "hfile.l2.bucketcache.queue.length";
+
+ /**
+ * If L2 cache is enabled, this configuration key controls whether or not
+ * blocks are cached upon writes. Default: true if L2 cache is enabled.
+ */
+ public static final String L2_CACHE_BLOCKS_ON_FLUSH_KEY =
+ "hfile.l2.cacheblocksonwrite";
+
+ /**
+ * Configuration key to enable evicting all blocks associated with an hfile
+ * after this hfile has been compacted. Default: true.
+ */
+ public static final String L2_EVICT_ON_CLOSE_KEY =
+ "hfile.l2.evictblocksonclose";
+
+ /**
+ * Configuration key to evict keys from the L2 cache once they have been
+ * cached in the L1 cache (i.e., the regular Lru Block Cache). Default: false.
+ * Not yet implemented!
+ *
+ * TODO (avf): needs to be implemented to be honoured correctly
+ */
+ public static final String L2_EVICT_ON_PROMOTION_KEY =
+ "hfile.l2.evictblocksonpromotion";
+
+ /**
+ * Duration that BucketCache instanced used by the L2 cache will tolerate IO
+ * errors until the cache is disabled. Default: 1 minute.
+ */
+ public static final String L2_BUCKET_CACHE_IOENGINE_ERRORS_TOLERATED_DURATION_KEY =
+ "hfile.l2.bucketcache.ioengine.errors.tolerated.duration";
+
+ /**
+ * Size of individual buckets for the bucket allocator. This is expected to
+ * a small (e.g., 5-15 items) list of comma-separated integer values
+ * centered around the size (in bytes) of an average compressed block.
+ */
+ public static final String L2_BUCKET_CACHE_BUCKET_SIZES_KEY =
+ "hfile.l2.bucketcache.bucket.sizes";
+
+ /**
+ * Size (in bytes) of an individual buffer inside ByteBufferArray which is used
+ * by the ByteBufferIOEngine. Default is 4mb. Larger byte buffers might mean
+ * greater lock contention; smaller byte buffer means more operations per
+ * write and higher memory overhead for maintaining per-buffer locks.
+ */
+ public static final String L2_BUCKET_CACHE_BUFFER_SIZE_KEY =
+ "hfile.l2.bucketcache.buffer.size";
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -79,10 +157,21 @@
public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_COMPRESSED_CACHE = false;
+ public static final boolean DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH = true;
+ public static final boolean DEFAULT_L2_EVICT_ON_PROMOTION = false;
+ public static final boolean DEFAULT_L2_EVICT_ON_CLOSE = true;
+ public static final int[] DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES = { 4 * 1024 + 1024,
+ 8 * 1024 + 1024, 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
+ 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 192 * 1024 + 1024,
+ 256 * 1024 + 1024, 384 * 1024 + 1024, 512 * 1024 + 1024 };
+ public static final int DEFAULT_L2_BUCKET_CACHE_BUFFER_SIZE = 4 * 1024 * 1024;
/** Local reference to the block cache, null if completely disabled */
private final BlockCache blockCache;
+ /** Local reference to the l2 cache, null if disabled */
+ private final L2Cache l2Cache;
+
/**
* Whether blocks should be cached on read (default is on if there is a
* cache but this can be turned off on a per-family or per-request basis)
@@ -107,28 +196,67 @@
/** Whether data blocks should be stored in compressed form in the cache */
private final boolean cacheCompressed;
+ /** Whether to cache all blocks on write in the L2 cache */
+ private final boolean l2CacheDataOnWrite;
+
/**
+ * TODO (avf): once implemented, whether to evict blocks from the L2
+ * cache once they have been cached in the L1 cache
+ */
+ private final boolean l2EvictOnPromotion;
+
+ /**
+ * Whether blocks of a file should be evicted from the L2 cache when the file
+ * is closed.
+ */
+ private final boolean l2EvictOnClose;
+
+ /**
+ * Parses a comma separated list of bucket sizes and returns a list of
+ * sorted integers representing the various bucket sizes. Bucket sizes should
+ * roughly correspond to block sizes encountered in production.
+ * @param conf The configuration
+ * @return
+ */
+ public static int[] getL2BucketSizes(Configuration conf) {
+ String[] bucketSizesStr = conf.getStrings(L2_BUCKET_CACHE_BUCKET_SIZES_KEY);
+ if (bucketSizesStr == null) {
+ return DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES;
+ }
+ int[] retVal = new int[bucketSizesStr.length];
+ for (int i = 0; i < bucketSizesStr.length; ++i) {
+ try {
+ retVal[i] = Integer.valueOf(bucketSizesStr[i]);
+ } catch (NumberFormatException nfe) {
+ LOG.fatal("Error parsing value " + bucketSizesStr[i], nfe);
+ throw nfe;
+ }
+ }
+ Arrays.sort(retVal);
+ return retVal;
+ }
+
+ /**
* Create a cache configuration using the specified configuration object and
* family descriptor.
* @param conf hbase configuration
* @param family column family configuration
*/
public CacheConfig(Configuration conf, HColumnDescriptor family) {
- this(CacheConfig.instantiateBlockCache(conf),
- family.isBlockCacheEnabled(),
- family.isInMemory(),
- // For the following flags we enable them regardless of per-schema settings
- // if they are enabled in the global configuration.
- conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_DATA_ON_WRITE) || family.shouldCacheDataOnWrite(),
- conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_INDEXES_ON_WRITE) || family.shouldCacheIndexesOnWrite(),
- conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(),
- conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY,
- DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(),
- conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE)
- );
+ this(LruBlockCacheFactory.getInstance().getBlockCache(conf), L2BucketCacheFactory.getInstance()
+ .getL2Cache(conf), family.isBlockCacheEnabled(), family.isInMemory(),
+ // For the following flags we enable them regardless of per-schema settings
+ // if they are enabled in the global configuration.
+ conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE)
+ || family.shouldCacheDataOnWrite(), conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+ DEFAULT_CACHE_INDEXES_ON_WRITE) || family.shouldCacheIndexesOnWrite(), conf.getBoolean(
+ CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE)
+ || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY,
+ DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), conf.getBoolean(
+ CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), conf.getBoolean(
+ L2_CACHE_BLOCKS_ON_FLUSH_KEY, DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH), conf.getBoolean(
+ L2_EVICT_ON_PROMOTION_KEY, DEFAULT_L2_EVICT_ON_PROMOTION), conf.getBoolean(
+ L2_EVICT_ON_CLOSE_KEY, DEFAULT_L2_EVICT_ON_CLOSE));
}
/**
@@ -137,19 +265,18 @@
* @param conf hbase configuration
*/
public CacheConfig(Configuration conf) {
- this(CacheConfig.instantiateBlockCache(conf),
- DEFAULT_CACHE_DATA_ON_READ,
- DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set
- // strictly from conf
- conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE),
- conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_INDEXES_ON_WRITE),
- conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_BLOOMS_ON_WRITE),
- conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE),
- conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY,
- DEFAULT_COMPRESSED_CACHE)
- );
+ this(LruBlockCacheFactory.getInstance().getBlockCache(conf), L2BucketCacheFactory.getInstance()
+ .getL2Cache(conf), DEFAULT_CACHE_DATA_ON_READ, DEFAULT_IN_MEMORY, // This is a family-level
+ // setting so can't be set
+ // strictly from conf
+ conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), conf.getBoolean(
+ CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE), conf.getBoolean(
+ CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(
+ EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), conf.getBoolean(
+ CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), conf.getBoolean(
+ L2_CACHE_BLOCKS_ON_FLUSH_KEY, DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH), conf.getBoolean(
+ L2_EVICT_ON_PROMOTION_KEY, DEFAULT_L2_EVICT_ON_PROMOTION), conf.getBoolean(
+ L2_EVICT_ON_CLOSE_KEY, DEFAULT_L2_EVICT_ON_CLOSE));
}
/**
@@ -164,12 +291,14 @@
* @param evictOnClose whether blocks should be evicted when HFile is closed
* @param cacheCompressed whether to store blocks as compressed in the cache
*/
- CacheConfig(final BlockCache blockCache,
+ CacheConfig(final BlockCache blockCache, final L2Cache l2Cache,
final boolean cacheDataOnRead, final boolean inMemory,
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose,
- final boolean cacheCompressed) {
+ final boolean cacheCompressed, final boolean l2CacheDataOnWrite,
+ final boolean l2EvictOnPromotion, final boolean l2EvictOnClose) {
this.blockCache = blockCache;
+ this.l2Cache = l2Cache;
this.cacheDataOnRead = cacheDataOnRead;
this.inMemory = inMemory;
this.cacheDataOnWrite = cacheDataOnWrite;
@@ -177,6 +306,9 @@
this.cacheBloomsOnWrite = cacheBloomsOnWrite;
this.evictOnClose = evictOnClose;
this.cacheCompressed = cacheCompressed;
+ this.l2CacheDataOnWrite = l2CacheDataOnWrite;
+ this.l2EvictOnPromotion = l2EvictOnPromotion;
+ this.l2EvictOnClose = l2EvictOnClose;
}
/**
@@ -184,10 +316,10 @@
* @param cacheConf
*/
public CacheConfig(CacheConfig cacheConf) {
- this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
- cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
- cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
- cacheConf.cacheCompressed);
+ this(cacheConf.blockCache, cacheConf.l2Cache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
+ cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite,
+ cacheConf.evictOnClose, cacheConf.cacheCompressed, cacheConf.l2CacheDataOnWrite,
+ cacheConf.l2EvictOnPromotion, cacheConf.l2EvictOnClose);
}
/**
@@ -290,70 +422,202 @@
return isBlockCacheEnabled() && this.cacheCompressed;
}
+ public boolean shouldL2CacheDataOnWrite() {
+ return l2CacheDataOnWrite;
+ }
+
+ public boolean shouldL2EvictOnPromotion() {
+ return l2EvictOnPromotion;
+ }
+
+ public boolean shouldL2EvictOnClose() {
+ return l2EvictOnClose;
+ }
+
@Override
public String toString() {
- if (!isBlockCacheEnabled()) {
+ if (!isBlockCacheEnabled() && !isL2CacheEnabled()) {
return "CacheConfig:disabled";
}
- return "CacheConfig:enabled " +
- "[cacheDataOnRead=" + shouldCacheDataOnRead() + "] " +
- "[cacheDataOnWrite=" + shouldCacheDataOnWrite() + "] " +
- "[cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + "] " +
- "[cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + "] " +
- "[cacheEvictOnClose=" + shouldEvictOnClose() + "] " +
- "[cacheCompressed=" + shouldCacheCompressed() + "]";
+ StringBuilder sb = new StringBuilder("CacheConfig:enabled");
+ if (isL2CacheEnabled()) {
+ sb = Strings.appendKeyValue(sb, "l2CacheDataOnWrite",
+ shouldL2CacheDataOnWrite());
+ sb = Strings.appendKeyValue(sb, "l2EvictOnClose", shouldL2EvictOnClose());
+ sb = Strings.appendKeyValue(sb, "l2EvictOnPromotion",
+ shouldL2EvictOnPromotion());
+ }
+ if (isBlockCacheEnabled()) {
+ sb = Strings.appendKeyValue(sb, "cacheDataOnRead", shouldCacheDataOnRead());
+ sb = Strings.appendKeyValue(sb, "cacheDataOnWrite", shouldCacheDataOnWrite());
+ sb = Strings.appendKeyValue(sb, "cacheIndexesOnWrite", shouldCacheIndexesOnWrite());
+ sb = Strings.appendKeyValue(sb, "cacheBloomsOnWrite", shouldCacheBloomsOnWrite());
+ sb = Strings.appendKeyValue(sb, "cacheEvictOnClose", shouldEvictOnClose());
+ sb = Strings.appendKeyValue(sb, "cacheCompressed", shouldCacheCompressed());
+ }
+ return sb.toString();
}
- // Static block cache reference and methods
+ public boolean isL2CacheEnabled() {
+ return l2Cache != null;
+ }
+ public L2Cache getL2Cache() {
+ return l2Cache;
+ }
+
/**
- * Static reference to the block cache, or null if no caching should be used
- * at all.
+ * A builder for the CacheConfig class. Used to avoid having multiple
+ * constructors.
*/
- private static BlockCache globalBlockCache;
+ public static class CacheConfigBuilder {
+ private final L2CacheFactory l2CacheFactory;
+ private final BlockCacheFactory blockCacheFactory;
+ private final Configuration conf;
- /** Boolean whether we have disabled the block cache entirely. */
- private static boolean blockCacheDisabled = false;
+ private BlockCache blockCache;
+ private L2Cache l2Cache;
+ private boolean cacheDataOnRead = DEFAULT_CACHE_DATA_ON_READ;
+ private boolean inMemory = DEFAULT_IN_MEMORY;
+ private boolean cacheDataOnFlush;
+ private boolean cacheIndexesOnWrite;
+ private boolean cacheBloomsOnWrite;
+ private boolean evictOnClose;
+ private boolean cacheCompressed;
+ private boolean l2CacheDataOnWrite;
+ private boolean l2EvictOnPromotion;
+ private boolean l2EvictOnClose;
- /**
- * Returns the block cache or null in case none should be used.
- *
- * @param conf The current configuration.
- * @return The block cache or null.
- */
- private static synchronized BlockCache instantiateBlockCache(
- Configuration conf) {
- if (globalBlockCache != null) return globalBlockCache;
- if (blockCacheDisabled) return null;
+ /**
+ * Creates an empty builder, to be used for unit tests.
+ */
+ public CacheConfigBuilder() {
+ this(null, null, null);
+ }
- float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
- HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
- if (cachePercentage == 0L) {
- blockCacheDisabled = true;
- return null;
+ /**
+ * Use {@link LruBlockCacheFactory} to construct the block cache and
+ * {@link L2BucketCacheFactory} to construct the L2 cache. Reads all
+ * other cache configuration from the specified configuration object.
+ * @param conf The HBase configuration that contains cache related settings
+ */
+ public CacheConfigBuilder(Configuration conf) {
+ this(conf, LruBlockCacheFactory.getInstance(),
+ L2BucketCacheFactory.getInstance());
}
- if (cachePercentage > 1.0) {
- throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY +
- " must be between 0.0 and 1.0, and not > 1.0");
+
+ /**
+ * Uses the specified {@link BlockCacheFactory} and {@link L2CacheFactory}
+ * to constructor the respective caches. Reads all cache configuration
+ * from the specified configuration object.
+ * @param conf The HBase configuration that contains cache related settings
+ * @param blockCacheFactory The factory used to construct the block cache
+ * @param l2CacheFactory The factory used to construct the l2 cache
+ */
+ public CacheConfigBuilder(Configuration conf,
+ BlockCacheFactory blockCacheFactory, L2CacheFactory l2CacheFactory) {
+ this.blockCacheFactory = blockCacheFactory;
+ this.l2CacheFactory = l2CacheFactory;
+ this.conf = conf;
+ cacheDataOnFlush = conf.getBoolean(
+ CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
+ cacheIndexesOnWrite = conf.getBoolean(
+ CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE);
+ cacheBloomsOnWrite = conf.getBoolean(
+ CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE);
+ evictOnClose = conf.getBoolean(
+ EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
+ cacheCompressed = conf.getBoolean(
+ CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE);
+ l2CacheDataOnWrite = conf.getBoolean(
+ L2_CACHE_BLOCKS_ON_FLUSH_KEY, DEFAULT_L2_CACHE_BLOCKS_ON_FLUSH);
+ l2EvictOnPromotion = conf.getBoolean(
+ L2_EVICT_ON_PROMOTION_KEY, DEFAULT_L2_EVICT_ON_PROMOTION);
+ l2EvictOnClose = conf.getBoolean(
+ L2_EVICT_ON_CLOSE_KEY, DEFAULT_L2_EVICT_ON_CLOSE);
}
- // Calculate the amount of heap to give the heap.
- MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- long cacheSize = (long)(mu.getMax() * cachePercentage);
- int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
- HFile.DEFAULT_BLOCKSIZE);
- long offHeapCacheSize =
- (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
- DirectMemoryUtils.getDirectMemorySize());
- LOG.info("Allocating LruBlockCache with maximum size " +
- StringUtils.humanReadableInt(cacheSize));
- if (offHeapCacheSize <= 0) {
- globalBlockCache = new LruBlockCache(cacheSize,
- StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
- } else {
- globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
- StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
+ public CacheConfigBuilder withBlockCache(BlockCache blockCache) {
+ this.blockCache = blockCache;
+ return this;
}
- return globalBlockCache;
+
+ public CacheConfigBuilder withL2Cache(L2Cache l2Cache) {
+ this.l2Cache = l2Cache;
+ return this;
+ }
+
+ public CacheConfigBuilder withCacheDataOnRead(boolean cacheDataOnRead) {
+ this.cacheDataOnRead = cacheDataOnRead;
+ return this;
+ }
+
+ public CacheConfigBuilder withInMemory(boolean inMemory) {
+ this.inMemory = inMemory;
+ return this;
+ }
+
+ public CacheConfigBuilder withCacheDataOnFlush(boolean cacheDataOnFlush) {
+ this.cacheDataOnFlush = cacheDataOnFlush;
+ return this;
+ }
+
+ public CacheConfigBuilder withCacheIndexesOnWrite(boolean cacheIndexesOnWrite) {
+ this.cacheIndexesOnWrite = cacheIndexesOnWrite;
+ return this;
+ }
+
+ public CacheConfigBuilder withCacheBloomsOnWrite(boolean cacheBloomsOnWrite) {
+ this.cacheBloomsOnWrite = cacheBloomsOnWrite;
+ return this;
+ }
+
+ public CacheConfigBuilder withEvictOnClose(boolean evictOnClose) {
+ this.evictOnClose = evictOnClose;
+ return this;
+ }
+
+ public CacheConfigBuilder withCacheCompressed(boolean cacheCompressed){
+ this.cacheCompressed = cacheCompressed;
+ return this;
+ }
+
+ public CacheConfigBuilder withL2CacheDataOnWrite(boolean l2CacheDataOnFlush) {
+ this.l2CacheDataOnWrite = l2CacheDataOnFlush;
+ return this;
+ }
+
+ public CacheConfigBuilder withL2EvictOnPromotion(boolean l2EvictOnPromotion) {
+ this.l2EvictOnPromotion = l2EvictOnPromotion;
+ return this;
+ }
+
+ public CacheConfigBuilder withL2EvictOnClose(boolean l2EvictOnClose){
+ this.l2EvictOnClose = l2EvictOnClose;
+ return this;
+ }
+
+ /**
+ * Create a cache configuration based on the current state of this
+ * builder object. If the block cache and/or L2 cache have not been
+ * explicitly passed in using the builder methods, use the respective
+ * factories to create these caches.
+ * @return The fully instantiated and configured {@link CacheConfig}
+ * instance
+ */
+ public CacheConfig build() {
+ if (blockCache == null && blockCacheFactory != null) {
+ blockCache = blockCacheFactory.getBlockCache(conf);
+ }
+ if (l2Cache == null && l2CacheFactory != null) {
+ l2Cache = l2CacheFactory.getL2Cache(conf);
+ }
+ return new CacheConfig(blockCache, l2Cache,
+ cacheDataOnRead, inMemory,
+ cacheDataOnFlush, cacheIndexesOnWrite,
+ cacheBloomsOnWrite, evictOnClose,
+ cacheCompressed, l2CacheDataOnWrite,
+ l2EvictOnPromotion, l2EvictOnClose);
+ }
}
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java (working copy)
@@ -0,0 +1,103 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+
+/**
+ * An implementation of L2 cache based on {@link BucketCache}
+ */
+public class L2BucketCache implements L2Cache {
+
+ private static final Log LOG = LogFactory.getLog(L2BucketCache.class);
+
+ private final BucketCache bucketCache;
+
+ public L2BucketCache(BucketCache bucketCache) {
+ this.bucketCache = bucketCache;
+ }
+
+ @Override
+ public byte[] getRawBlock(String hfileName, long dataBlockOffset) {
+ long startTimeNs = System.nanoTime();
+ BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset);
+ byte[] fromCache = bucketCache.getBlock(cacheKey, true);
+ if (LOG.isTraceEnabled()) {
+ // Log elapsed time to retrieve a block from the cache
+ long elapsedNs = System.nanoTime() - startTimeNs;
+ LOG.trace("getRawBlock() " + (fromCache == null ?"MISS" : "HIT") +
+ " on hfileName=" + hfileName + ", offset=" + dataBlockOffset +
+ " in " + elapsedNs + " ns.");
+ }
+ return fromCache;
+ }
+
+ @Override
+ public void cacheRawBlock(String hfileName, long dataBlockOffset, byte[] block) {
+ long startTimeNs = System.nanoTime();
+ BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset);
+ bucketCache.cacheBlock(cacheKey, block);
+ if (LOG.isTraceEnabled()) {
+ long elapsedNs = System.nanoTime() - startTimeNs;
+ LOG.trace("cacheRawBlock() on hfileName=" + hfileName + ", offset=" +
+ dataBlockOffset + " in " + elapsedNs + " ns.");
+ }
+ }
+
+ @Override
+ public int evictBlocksByHfileName(String hfileName) {
+ long startTimeNs = System.nanoTime(); // Measure eviction perf
+ int numEvicted = bucketCache.evictBlocksByHfileName(hfileName);
+ if (LOG.isTraceEnabled()) {
+ long elapsedNs = System.nanoTime() - startTimeNs;
+ LOG.trace("evictBlockByHfileName() on hfileName=" + hfileName + ": " +
+ numEvicted + " blocks evicted in " + elapsedNs + " ns.");
+ }
+ return numEvicted;
+ }
+
+ @Override
+ public void shutdown() {
+ bucketCache.shutdown();
+ }
+
+ public CacheStats getStats() {
+ return bucketCache.getStats();
+ }
+
+ public long size() {
+ return bucketCache.getBlockCount();
+ }
+
+ public long getFreeSize() {
+ return bucketCache.getFreeSize();
+ }
+
+ public long getCurrentSize() {
+ return bucketCache.size();
+ }
+
+ public long getEvictedCount() {
+ return bucketCache.getEvictedCount();
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java (working copy)
@@ -423,4 +423,7 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public void clearCache() {
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java (working copy)
@@ -354,4 +354,9 @@
+ ClassSize.ATOMIC_LONG);
}
}
+
+ @Override
+ public void clearCache() {
+ backingMap.clear();
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy)
@@ -238,9 +238,8 @@
// Cache Miss, please load.
}
- HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
- nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
- true);
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset,
+ metaBlockIndexReader.getRootBlockDataSize(block), true, false);
passSchemaMetricsTo(hfileBlock);
hfileBlock.expectType(BlockType.META);
@@ -314,8 +313,8 @@
nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
}
- HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
- - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset,
+ dataBlockIndexReader.getRootBlockDataSize(block), pread, false);
passSchemaMetricsTo(hfileBlock);
hfileBlock.expectType(BlockType.DATA);
@@ -389,6 +388,12 @@
getSchemaMetrics().flushMetrics();
}
+ @Override
+ public void close(boolean evictL1OnClose, boolean evictL2OnClose)
+ throws IOException {
+ close(evictL1OnClose); // HFileReaderV1 does not support L2 cache
+ }
+
protected abstract static class AbstractScannerV1
extends AbstractHFileReader.Scanner {
protected int currBlock;
Index: src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (working copy)
@@ -0,0 +1,182 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class manages an array of ByteBuffers with a default size 4MB. These
+ * buffers are sequential and could be considered as a large buffer.It supports
+ * reading/writing data from this large buffer with a position and offset
+ */
+public class ByteBufferArray {
+
+ private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
+
+ private final ByteBuffer buffers[];
+ private final Lock locks[];
+ private final int bufferSize;
+ private final int bufferCount;
+
+ /**
+ * We allocate a number of byte buffers as the capacity. In order not to out
+ * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
+ * we will allocate one additional buffer with capacity 0;
+ * @param capacity total size of the byte buffer array
+ * @param directByteBuffer true if we allocate direct buffer
+ */
+ public ByteBufferArray(long capacity, int bufferSize, boolean directByteBuffer) {
+ if (bufferSize > (capacity / 16)) {
+ bufferSize = (int) roundUp(capacity / 16, 32768);
+ }
+ this.bufferSize = bufferSize;
+ this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
+ LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+ + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+ + bufferCount);
+ buffers = new ByteBuffer[bufferCount + 1];
+ locks = new Lock[bufferCount + 1];
+ for (int i = 0; i <= bufferCount; i++) {
+ locks[i] = new ReentrantLock();
+ if (i < bufferCount) {
+ buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize) :
+ ByteBuffer.allocate(bufferSize);
+ } else {
+ buffers[i] = ByteBuffer.allocate(0);
+ }
+
+ }
+ }
+
+ private static long roundUp(long n, long to) {
+ return ((n + to - 1) / to) * to;
+ }
+
+ /**
+ * Transfers bytes from this buffer array into the given destination array
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be written to the given array
+ * @param dstArray The array into which bytes are to be written
+ * @param dstOffset The offset within the given array of the first byte to be
+ * written
+ */
+ public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
+ multiple(start, len, dstArray, dstOffset, new Visitor() {
+ public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+ bb.get(array, arrayIdx, len);
+ }
+ });
+ }
+
+ /**
+ * Transfers bytes from the given source array into this buffer array
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be read from the given array
+ * @param srcArray The array from which bytes are to be read
+ * @param srcOffset The offset within the given array of the first byte to be
+ * read
+ */
+ public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
+ multiple(start, len, srcArray, srcOffset, new Visitor() {
+ public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+ bb.put(array, arrayIdx, len);
+ }
+ });
+ }
+
+ private interface Visitor {
+ /**
+ * Visit the given byte buffer, if it is a read action, we will transfer the
+ * bytes from the buffer to the destination array, else if it is a write
+ * action, we will transfer the bytes from the source array to the buffer
+ * @param bb byte buffer
+ * @param array a source or destination byte array
+ * @param arrayOffset offset of the byte array
+ * @param len read/write length
+ */
+ void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
+ }
+
+ /**
+ * Access(read or write) this buffer array with a position and length as the
+ * given array. Here we will only lock one buffer even if it may be need visit
+ * several buffers. The consistency is guaranteed by the caller.
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be accessed
+ * @param array The array from/to which bytes are to be read/written
+ * @param arrayOffset The offset within the given array of the first byte to
+ * be read or written
+ * @param visitor implement of how to visit the byte buffer
+ */
+ void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
+ Preconditions.checkArgument(len >= 0);
+ Preconditions.checkArgument(len + arrayOffset <= array.length);
+ long end = start + len;
+ int startBuffer = Preconditions.checkPositionIndex(
+ (int) (start / bufferSize), bufferCount);
+ int endBuffer = (int) (end / bufferSize);
+ int endOffset = (int) (end % bufferSize);
+ Preconditions.checkArgument(endBuffer >= 0 && endBuffer < bufferCount
+ || (endBuffer == bufferCount && endOffset == 0));
+
+ int startOffset = (int) (start % bufferSize);
+ if (startBuffer >= locks.length || startBuffer < 0) {
+ String msg = "Failed multiple, start=" + start + ",startBuffer="
+ + startBuffer + ",bufferSize=" + bufferSize;
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+ int srcIndex = 0, cnt = -1;
+ for (int i = startBuffer; i <= endBuffer; ++i) {
+ Lock lock = locks[i];
+ lock.lock();
+ try {
+ ByteBuffer bb = buffers[i];
+ if (i == startBuffer) {
+ cnt = bufferSize - startOffset;
+ if (cnt > len) {
+ cnt = len;
+ }
+ bb.limit(startOffset + cnt).position(startOffset);
+ } else if (i == endBuffer) {
+ cnt = endOffset;
+ bb.limit(cnt).position(0);
+ } else {
+ cnt = bufferSize ;
+ bb.limit(cnt).position(0);
+ }
+ visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
+ srcIndex += cnt;
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (srcIndex != len) {
+ throw new IllegalStateException("srcIndex(" + srcIndex + ") != len(" +
+ len + ")!");
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java (working copy)
@@ -0,0 +1,62 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Extends the patterns in {@link Preconditions}
+ */
+public class ConditionUtil {
+
+ /**
+ * Checks if a specified offset is >= 0
+ * @param offset The offset to check
+ * @return The specified offset if it is >= 0
+ * @throws IndexOutOfBoundsException If specified offset is negative
+ */
+ public static long checkPositiveOffset(long offset) {
+ return checkOffset(offset, -1);
+ }
+
+ /**
+ * Check if an offset is >= 0 but less than a maximum limit (if one is
+ * specified).
+ * @see {@link Preconditions#checkPositionIndex(int, int)}
+ * @param offset The offset to check
+ * @param limit The maximum limit or -1 if none
+ * @return The specified offset if it is positive and if the a limit is
+ * specified lower than that limit.
+ * @throws IllegalStateException If the offset is negative, or if a limit
+ * is specified and the offset is greater than the limit.
+ */
+ public static long checkOffset(long offset, long limit) {
+ if (offset < 0) {
+ throw new IndexOutOfBoundsException("Negative offset: " + offset);
+ }
+ if (limit != -1 && offset >= limit) {
+ throw new IndexOutOfBoundsException("Offset (" + offset +
+ ") is greater than limit (" + limit + ")");
+ }
+ return offset;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -115,6 +115,7 @@
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.L2BucketCache;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -823,6 +824,10 @@
cacheConfig.getBlockCache().shutdown();
}
+ if (cacheConfig.isL2CacheEnabled()) {
+ cacheConfig.getL2Cache().shutdown();
+ }
+
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
@@ -1633,6 +1638,20 @@
percent = (int) (ratio * 100);
this.metrics.blockCacheHitCachingRatioPastNPeriods.set(percent);
}
+
+ L2BucketCache l2BucketCache = (L2BucketCache)cacheConfig.getL2Cache();
+ if (l2BucketCache != null) {
+ this.metrics.l2CacheCount.set(l2BucketCache.size());
+ this.metrics.l2CacheFree.set(l2BucketCache.getFreeSize());
+ this.metrics.l2CacheSize.set(l2BucketCache.getCurrentSize());
+ this.metrics.l2CacheEvictedCount.set(l2BucketCache.getEvictedCount());
+ CacheStats l2CacheStats = l2BucketCache.getStats();
+ this.metrics.l2CacheHitCount.set(l2CacheStats.getHitCount());
+ this.metrics.l2CacheMissCount.set(l2CacheStats.getMissCount());
+ double ratio = l2CacheStats.getHitRatioPastNPeriods();
+ int percent = (int) (ratio * 100);
+ this.metrics.l2CacheHitRatioPastNPeriods.set(percent);
+ }
float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
getServerName().getHostname());
int percent = (int) (localityIndex * 100);
Index: src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (working copy)
@@ -130,6 +130,43 @@
/** Block hit caching ratio for past N periods */
public final MetricsIntValue blockCacheHitCachingRatioPastNPeriods = new MetricsIntValue("blockCacheHitCachingRatioPastNPeriods", registry);
+ /**
+ * L2 cache size
+ */
+ public final MetricsLongValue l2CacheSize = new MetricsLongValue("l2CacheSize", registry);
+
+ /**
+ * L2 cache free size
+ */
+ public final MetricsLongValue l2CacheFree = new MetricsLongValue("l2CacheFree", registry);
+
+ /**
+ * L2 cache item count
+ */
+ public final MetricsLongValue l2CacheCount = new MetricsLongValue("l2CacheCount", registry);
+
+ /**
+ * L2 cache hit count
+ */
+ public final MetricsLongValue l2CacheHitCount = new MetricsLongValue("l2CacheHitCount", registry);
+
+ /**
+ * L2 cache miss count
+ */
+ public final MetricsLongValue l2CacheMissCount = new MetricsLongValue("l2CacheMissCount",
+ registry);
+
+ /**
+ * L2 evicted item count
+ */
+ public final MetricsLongValue l2CacheEvictedCount = new MetricsLongValue("l2CacheEvictedCount",
+ registry);
+
+ /**
+ * L2 cache hit ratio
+ */
+ public final MetricsIntValue l2CacheHitRatioPastNPeriods = new MetricsIntValue("l2CacheHitRatioPastNPeriods", registry);
+
/*
* Count of requests to the regionservers since last call to metrics update
*/
@@ -387,6 +424,13 @@
this.hdfsBlocksLocalityIndex.pushMetric(this.metricsRecord);
this.blockCacheHitRatioPastNPeriods.pushMetric(this.metricsRecord);
this.blockCacheHitCachingRatioPastNPeriods.pushMetric(this.metricsRecord);
+ this.l2CacheSize.pushMetric(this.metricsRecord);
+ this.l2CacheFree.pushMetric(this.metricsRecord);
+ this.l2CacheCount.pushMetric(this.metricsRecord);
+ this.l2CacheHitCount.pushMetric(this.metricsRecord);
+ this.l2CacheMissCount.pushMetric(this.metricsRecord);
+ this.l2CacheEvictedCount.pushMetric(this.metricsRecord);
+ this.l2CacheHitRatioPastNPeriods.pushMetric(this.metricsRecord);
// Mix in HFile and HLog metrics
// Be careful. Here is code for MTVR from up in hadoop:
@@ -573,6 +617,20 @@
Long.valueOf(this.blockCacheHitRatio.get())+"%");
sb = Strings.appendKeyValue(sb, this.blockCacheHitCachingRatio.getName(),
Long.valueOf(this.blockCacheHitCachingRatio.get())+"%");
+ sb = Strings.appendKeyValue(sb, this.l2CacheSize.getName(),
+ this.l2CacheSize.get());
+ sb = Strings.appendKeyValue(sb, this.l2CacheFree.getName(),
+ this.l2CacheFree.get());
+ sb = Strings.appendKeyValue(sb, this.l2CacheCount.getName(),
+ this.l2CacheCount.get());
+ sb = Strings.appendKeyValue(sb, this.l2CacheHitCount.getName(),
+ this.l2CacheHitCount.get());
+ sb = Strings.appendKeyValue(sb, this.l2CacheMissCount.getName(),
+ this.l2CacheMissCount.get());
+ sb = Strings.appendKeyValue(sb, this.l2CacheEvictedCount.getName(),
+ this.l2CacheEvictedCount.get());
+ sb = Strings.appendKeyValue(sb, this.l2CacheHitRatioPastNPeriods.getName(),
+ this.l2CacheHitRatioPastNPeriods.get());
sb = Strings.appendKeyValue(sb, this.hdfsBlocksLocalityIndex.getName(),
Long.valueOf(this.hdfsBlocksLocalityIndex.get()));
sb = Strings.appendKeyValue(sb, "slowHLogAppendCount",
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -108,6 +108,7 @@
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.L2Cache;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
@@ -6032,7 +6033,12 @@
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
- if (bc != null) bc.shutdown();
+ L2Cache l2c = new CacheConfig(c).getL2Cache();
+ try {
+ if (bc != null) bc.shutdown();
+ } finally {
+ if (l2c != null) l2c.shutdown();
+ }
}
}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1531202)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy)
@@ -690,8 +690,13 @@
*/
public synchronized void closeReader(boolean evictOnClose)
throws IOException {
+ closeReader(evictOnClose, evictOnClose);
+ }
+
+ public synchronized void closeReader(boolean evictL1OnClose, boolean evictL2OnClose)
+ throws IOException {
if (this.reader != null) {
- this.reader.close(evictOnClose);
+ this.reader.close(evictL1OnClose, evictL2OnClose);
this.reader = null;
}
}
@@ -1464,6 +1469,10 @@
reader.close(evictOnClose);
}
+ public void close(boolean evictL1OnClose, boolean evictL2OnClose) throws IOException {
+ reader.close(evictL1OnClose, evictL2OnClose);
+ }
+
/**
* Check if this storeFile may contain keys within the TimeRange that
* have not expired (i.e. not older than oldestUnexpiredTS).