### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (revision 1427781) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (working copy) @@ -663,6 +663,11 @@ @Override public void serialize(ByteBuffer destination) { } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (working copy) @@ -171,6 +171,22 @@ windowIndex = (windowIndex + 1) % numPeriodsInWindow; } + public long getSumHitCountsPastNPeriods() { + return sum(hitCounts); + } + + public long getSumRequestCountsPastNPeriods() { + return sum(requestCounts); + } + + public long getSumHitCachingCountsPastNPeriods() { + return sum(hitCachingCounts); + } + + public long getSumRequestCachingCountsPastNPeriods() { + return sum(requestCachingCounts); + } + public double getHitRatioPastNPeriods() { double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts)); return Double.isNaN(ratio) ? 0 : ratio; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java (revision 0) @@ -0,0 +1,213 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; + +/** + * CombinedBlockCache is an abstraction layer that combines + * {@link LruBlockCache} and {@link BucketCache}, the smaller lruCache is used + * to cache bloom blocks and index blocks , the larger bucketCache is used to + * cache data blocks.Getblock reads first from the smaller lruCache before + * looking for the block in the bucketCache. Metrics are the combined size and + * hits and misses of both caches. + * + **/ +public class CombinedBlockCache implements BlockCache, HeapSize { + + private final LruBlockCache lruCache; + private final BucketCache bucketCache; + private final CombinedCacheStats combinedCacheStats; + + public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) { + this.lruCache = lruCache; + this.bucketCache = bucketCache; + this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), + bucketCache.getStats()); + } + + @Override + public long heapSize() { + return lruCache.heapSize() + bucketCache.heapSize(); + } + + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { + boolean isMetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA; + if (isMetaBlock) { + lruCache.cacheBlock(cacheKey, buf, inMemory); + } else { + bucketCache.cacheBlock(cacheKey, buf, inMemory); + } + } + + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + cacheBlock(cacheKey, buf, false); + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean repeat) { + if (lruCache.containBlock(cacheKey)) { + return lruCache.getBlock(cacheKey, caching, repeat); + } else { + return bucketCache.getBlock(cacheKey, caching, repeat); + } + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey); + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + return lruCache.evictBlocksByHfileName(hfileName) + + bucketCache.evictBlocksByHfileName(hfileName); + } + + @Override + public CacheStats getStats() { + return this.combinedCacheStats; + } + + @Override + public void shutdown() { + lruCache.shutdown(); + bucketCache.shutdown(); + + } + + @Override + public long size() { + return lruCache.size() + bucketCache.size(); + } + + @Override + public long getFreeSize() { + return lruCache.getFreeSize() + bucketCache.getFreeSize(); + } + + @Override + public long getCurrentSize() { + return lruCache.getCurrentSize() + bucketCache.getCurrentSize(); + } + + @Override + public long getEvictedCount() { + return lruCache.getEvictedCount() + bucketCache.getEvictedCount(); + } + + @Override + public long getBlockCount() { + return lruCache.getBlockCount() + bucketCache.getBlockCount(); + } + + @Override + public List getBlockCacheColumnFamilySummaries( + Configuration conf) throws IOException { + throw new UnsupportedOperationException(); + } + + private static class CombinedCacheStats extends CacheStats { + private final CacheStats lruCacheStats; + private final CacheStats bucketCacheStats; + + CombinedCacheStats(CacheStats lbcStats, CacheStats fcStats) { + this.lruCacheStats = lbcStats; + this.bucketCacheStats = fcStats; + } + + @Override + public long getRequestCount() { + return lruCacheStats.getRequestCount() + + bucketCacheStats.getRequestCount(); + } + + @Override + public long getRequestCachingCount() { + return lruCacheStats.getRequestCachingCount() + + bucketCacheStats.getRequestCachingCount(); + } + + @Override + public long getMissCount() { + return lruCacheStats.getMissCount() + bucketCacheStats.getMissCount(); + } + + @Override + public long getMissCachingCount() { + return lruCacheStats.getMissCachingCount() + + bucketCacheStats.getMissCachingCount(); + } + + @Override + public long getHitCount() { + return lruCacheStats.getHitCount() + bucketCacheStats.getHitCount(); + } + + @Override + public long getHitCachingCount() { + return lruCacheStats.getHitCachingCount() + + bucketCacheStats.getHitCachingCount(); + } + + @Override + public long getEvictionCount() { + return lruCacheStats.getEvictionCount() + + bucketCacheStats.getEvictionCount(); + } + + @Override + public long getEvictedCount() { + return lruCacheStats.getEvictedCount() + + bucketCacheStats.getEvictedCount(); + } + + @Override + public double getHitRatioPastNPeriods() { + double ratio = ((double) (lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats + .getSumHitCountsPastNPeriods()) / (double) (lruCacheStats + .getSumRequestCountsPastNPeriods() + bucketCacheStats + .getSumRequestCountsPastNPeriods())); + return Double.isNaN(ratio) ? 0 : ratio; + } + + @Override + public double getHitCachingRatioPastNPeriods() { + double ratio = ((double) (lruCacheStats + .getSumHitCachingCountsPastNPeriods() + bucketCacheStats + .getSumHitCachingCountsPastNPeriods()) / (double) (lruCacheStats + .getSumRequestCachingCountsPastNPeriods() + bucketCacheStats + .getSumRequestCachingCountsPastNPeriods())); + return Double.isNaN(ratio) ? 0 : ratio; + } + + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java (revision 0) @@ -0,0 +1,56 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class is used to manage the identifier IDs for + * {@link CacheableDeserializer} + */ +public class CacheableDeserializerIdManager { + private static final Map> registeredDeserializers = + new HashMap>(); + private static final AtomicInteger identifier = new AtomicInteger(0); + + /** + * Register the given cacheable deserializer and generate an unique identifier + * id for it + * @param cd + * @return the identifier of given cacheable deserializer + */ + public static int registerDeserializer(CacheableDeserializer cd) { + int idx = identifier.incrementAndGet(); + synchronized (registeredDeserializers) { + registeredDeserializers.put(idx, cd); + } + return idx; + } + + /** + * Get the cacheable deserializer as the given identifier Id + * @param idx + * @return CacheableDeserializer + */ + public static CacheableDeserializer getDeserializer(int idx) { + return registeredDeserializers.get(idx); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (revision 1427781) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.ChecksumType; public class CacheTestUtils { @@ -149,7 +150,11 @@ try { if (toBeTested.getBlock(block.blockName, true, false) != null) { toBeTested.cacheBlock(block.blockName, block.block); - fail("Cache should not allow re-caching a block"); + if (!(toBeTested instanceof BucketCache)) { + // BucketCache won't throw exception when caching already cached + // block + fail("Cache should not allow re-caching a block"); + } } } catch (RuntimeException re) { // expected @@ -242,6 +247,30 @@ private static class ByteArrayCacheable implements Cacheable { + static final CacheableDeserializer blockDeserializer = + new CacheableDeserializer() { + + @Override + public Cacheable deserialize(ByteBuffer b) throws IOException { + int len = b.getInt(); + Thread.yield(); + byte buf[] = new byte[len]; + b.get(buf); + return new ByteArrayCacheable(buf); + } + + @Override + public int getDeserialiserIdentifier() { + return deserializerIdentifier; + } + + @Override + public Cacheable deserialize(ByteBuffer b, boolean reuse) + throws IOException { + return deserialize(b); + } + }; + final byte[] buf; public ByteArrayCacheable(byte[] buf) { @@ -268,20 +297,22 @@ @Override public CacheableDeserializer getDeserializer() { - return new CacheableDeserializer() { + return blockDeserializer; + } - @Override - public Cacheable deserialize(ByteBuffer b) throws IOException { - int len = b.getInt(); - Thread.yield(); - byte buf[] = new byte[len]; - b.get(buf); - return new ByteArrayCacheable(buf); - } - }; + private static final int deserializerIdentifier; + static { + deserializerIdentifier = CacheableDeserializerIdManager + .registerDeserializer(blockDeserializer); } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } } + private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (revision 0) @@ -0,0 +1,57 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Class that implements cache metrics for bucket cache. + */ +public class BucketCacheStats extends CacheStats { + private final AtomicLong ioHitCount = new AtomicLong(0); + private final AtomicLong ioHitTime = new AtomicLong(0); + private final static int nanoTime = 1000000; + private long lastLogTime = EnvironmentEdgeManager.currentTimeMillis(); + + public void ioHit(long time) { + ioHitCount.incrementAndGet(); + ioHitTime.addAndGet(time); + } + + public long getIOHitsPerSecond() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long took = (now - lastLogTime) / 1000; + lastLogTime = now; + return ioHitCount.get() / took; + } + + public double getIOTimePerHit() { + long time = ioHitTime.get() / nanoTime; + long count = ioHitCount.get(); + return ((float) time / (float) count); + } + + public void reset() { + ioHitCount.set(0); + ioHitTime.set(0); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (revision 0) @@ -0,0 +1,98 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.util.ByteBufferArray; + +/** + * IO engine that stores data on the memory using an array of ByteBuffers + * {@link ByteBufferArray} + */ +public class ByteBufferIOEngine implements IOEngine { + + private ByteBufferArray bufferArray; + + /** + * Construct the ByteBufferIOEngine with the given capacity + * @param capacity + * @param direct true if allocate direct buffer + * @throws IOException + */ + public ByteBufferIOEngine(long capacity, boolean direct) + throws IOException { + bufferArray = new ByteBufferArray(capacity, direct); + } + + /** + * Memory IO engine is always unable to support persistent storage for the + * cache + * @return false + */ + @Override + public boolean isPersistent() { + return false; + } + + /** + * Transfers data from the buffer array to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the ByteBufferArray of the first byte to be + * read + * @throws IOException + */ + @Override + public void read(ByteBuffer dstBuffer, long offset) throws IOException { + assert dstBuffer.hasArray(); + bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), + dstBuffer.arrayOffset()); + } + + /** + * Transfers data from the given byte buffer to the buffer array + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the ByteBufferArray of the first byte to be + * written + * @throws IOException + */ + @Override + public void write(ByteBuffer srcBuffer, long offset) throws IOException { + assert srcBuffer.hasArray(); + bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), + srcBuffer.arrayOffset()); + } + + /** + * No operation for the sync in the memory IO engine + */ + @Override + public void sync() { + + } + + /** + * No operation for the shutdown in the memory IO engine + */ + @Override + public void shutdown() { + + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; + +/** + * Thrown by {@link BucketAllocator} + */ +public class BucketAllocatorException extends IOException { + private static final long serialVersionUID = 2479119906660788096L; + + BucketAllocatorException(String reason) { + super(reason); + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (revision 0) @@ -0,0 +1,62 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A class implementing IOEngine interface could support data services for + * {@link BucketCache}. + */ +public interface IOEngine { + + /** + * @return true if persistent storage is supported for the cache when shutdown + */ + boolean isPersistent(); + + /** + * Transfers data from IOEngine to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the IO engine where the first byte to be read + * @throws IOException + */ + void read(ByteBuffer dstBuffer, long offset) throws IOException; + + /** + * Transfers data from the given byte buffer to IOEngine + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the IO engine where the first byte to be + * written + * @throws IOException + */ + void write(ByteBuffer srcBuffer, long offset) throws IOException; + + /** + * Sync the data to IOEngine after writing + * @throws IOException + */ + void sync() throws IOException; + + /** + * Shutdown the IOEngine + */ + void shutdown(); +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (revision 1427781) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (working copy) @@ -135,6 +135,11 @@ return null; } + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } + }, accessTime, false); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (working copy) @@ -56,4 +56,9 @@ */ public CacheableDeserializer getDeserializer(); + /** + * @return the block type of this cached HFile block + */ + public BlockType getBlockType(); + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (revision 0) @@ -0,0 +1,52 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; + +/** + * Thrown by {@link BucketAllocator#allocateBlock(int)} when cache is full for + * the requested size + */ +public class CacheFullException extends IOException { + private static final long serialVersionUID = 3265127301824638920L; + private int requestedSize, bucketIndex; + + CacheFullException(int requestedSize, int bucketIndex) { + super(); + this.requestedSize = requestedSize; + this.bucketIndex = bucketIndex; + } + + public int bucketIndex() { + return bucketIndex; + } + + public int requestedSize() { + return requestedSize; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(1024); + sb.append("Allocator requested size ").append(requestedSize); + sb.append(" for bucket ").append(bucketIndex); + return sb.toString(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (working copy) @@ -27,7 +27,7 @@ * Cache Key for use with implementations of {@link BlockCache} */ @InterfaceAudience.Private -public class BlockCacheKey implements HeapSize { +public class BlockCacheKey implements HeapSize, java.io.Serializable { private final String hfileName; private final long offset; private final DataBlockEncoding encoding; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (revision 0) @@ -0,0 +1,154 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Basic test of BucketCache.Puts and gets. + *

+ * Tests will ensure that blocks' data correctness under several threads + * concurrency + */ +@Category(SmallTests.class) +public class TestBucketCache { + static final Log LOG = LogFactory.getLog(TestBucketCache.class); + BucketCache cache; + final int CACHE_SIZE = 1000000; + final int NUM_BLOCKS = 100; + final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; + final int NUM_THREADS = 1000; + final int NUM_QUERIES = 10000; + + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + String ioEngineName = "heap"; + String persistencePath = null; + + private class MockedBucketCache extends BucketCache { + + public MockedBucketCache(String ioEngineName, long capacity, + int writerThreads, + int writerQLen, String persistencePath) throws FileNotFoundException, + IOException { + super(ioEngineName, capacity, writerThreads, writerQLen, persistencePath); + super.wait_when_cache = true; + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, + boolean inMemory) { + if (super.getBlock(cacheKey, true, false) != null) { + throw new RuntimeException("Cached an already cached block"); + } + super.cacheBlock(cacheKey, buf, inMemory); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + if (super.getBlock(cacheKey, true, false) != null) { + throw new RuntimeException("Cached an already cached block"); + } + super.cacheBlock(cacheKey, buf); + } + } + + @Before + public void setup() throws FileNotFoundException, IOException { + cache = new MockedBucketCache(ioEngineName, capacitySize, writeThreads, + writerQLen, persistencePath); + } + + @After + public void tearDown() { + cache.shutdown(); + } + + @Test + public void testBucketAllocator() throws BucketAllocatorException { + BucketAllocator mAllocator = cache.getAllocator(); + /* + * Test the allocator first + */ + int[] blockSizes = new int[2]; + blockSizes[0] = 4 * 1024; + blockSizes[1] = 8 * 1024; + boolean full = false; + int i = 0; + ArrayList allocations = new ArrayList(); + // Fill the allocated extents + while (!full) { + try { + allocations.add(new Long(mAllocator.allocateBlock(blockSizes[i + % blockSizes.length]))); + ++i; + } catch (CacheFullException cfe) { + full = true; + } + } + + for (i = 0; i < blockSizes.length; i++) { + BucketSizeInfo bucketSizeInfo = mAllocator + .roundUpToBucketSizeInfo(blockSizes[0]); + IndexStatistics indexStatistics = bucketSizeInfo.statistics(); + assertTrue(indexStatistics.freeCount() == 0); + } + + for (long offset : allocations) { + assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator + .freeBlock(offset)); + } + assertTrue(mAllocator.getUsedSize() == 0); + } + + @Test + public void testCacheSimple() throws Exception { + CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); + } + + @Test + public void testCacheMultiThreadedSingleKey() throws Exception { + CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); + } + + @Test + public void testHeapSizeChanges() throws Exception { + cache.stopWriterThreads(); + CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java (revision 0) @@ -0,0 +1,65 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.bucket.FileIOEngine; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Basic test for {@link FileIOEngine} + */ +@Category(SmallTests.class) +public class TestFileIOEngine { + @Test + public void testFileIOEngine() throws IOException { + int size = 2 * 1024 * 1024; // 2 MB + String filePath = "testFileIOEngine"; + try { + FileIOEngine fileIOEngine = new FileIOEngine(filePath, size); + for (int i = 0; i < 50; i++) { + int len = (int) Math.floor(Math.random() * 100); + long offset = (long) Math.floor(Math.random() * size % (size - len)); + byte[] data1 = new byte[len]; + for (int j = 0; j < data1.length; ++j) { + data1[j] = (byte) (Math.random() * 255); + } + byte[] data2 = new byte[len]; + fileIOEngine.write(ByteBuffer.wrap(data1), offset); + fileIOEngine.read(ByteBuffer.wrap(data2), offset); + for (int j = 0; j < data1.length; ++j) { + assertTrue(data1[j] == data2[j]); + } + } + } finally { + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + } + + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.DirectMemoryUtils; import org.apache.hadoop.util.StringUtils; @@ -72,6 +74,28 @@ public static final String EVICT_BLOCKS_ON_CLOSE_KEY = "hbase.rs.evictblocksonclose"; + /** + * Configuration keys for Bucket cache + */ + public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine"; + public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = + "hbase.bucketcache.persistent.path"; + public static final String BUCKET_CACHE_COMBINED_KEY = + "hbase.bucketcache.combinedcache.enabled"; + public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = + "hbase.bucketcache.percentage.in.combinedcache"; + public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = + "hbase.bucketcache.writer.queuelength"; + /** + * Defaults for Bucket cache + */ + public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true; + public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3; + public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64; + public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -341,19 +365,59 @@ // Calculate the amount of heap to give the heap. MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - long cacheSize = (long)(mu.getMax() * cachePercentage); + long lruCacheSize = (long) (mu.getMax() * cachePercentage); int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HFile.DEFAULT_BLOCKSIZE); long offHeapCacheSize = (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) * DirectMemoryUtils.getDirectMemorySize()); - LOG.info("Allocating LruBlockCache with maximum size " + - StringUtils.humanReadableInt(cacheSize)); if (offHeapCacheSize <= 0) { - globalBlockCache = new LruBlockCache(cacheSize, - StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf); + String bucketCacheIOEngineName = conf + .get(BUCKET_CACHE_IOENGINE_KEY, null); + float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); + long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax() + * bucketCachePercentage : bucketCachePercentage * 1024 * 1024); + + boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, + DEFAULT_BUCKET_CACHE_COMBINED); + BucketCache bucketCache = null; + if (bucketCacheIOEngineName != null && bucketCacheSize > 0) { + int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY, + DEFAULT_BUCKET_CACHE_WRITER_THREADS); + int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY, + DEFAULT_BUCKET_CACHE_WRITER_QUEUE); + String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY); + float combinedPercentage = conf.getFloat( + BUCKET_CACHE_COMBINED_PERCENTAGE_KEY, + DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE); + if (combinedWithLru) { + lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize); + bucketCacheSize = (long) (combinedPercentage * bucketCacheSize); + } + try { + int ioErrorsTolerationDuration = conf.getInt( + "hbase.bucketcache.ioengine.errors.tolerated.duration", + BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); + bucketCache = new BucketCache(bucketCacheIOEngineName, + bucketCacheSize, writerThreads, writerQueueLen, persistentPath, + ioErrorsTolerationDuration); + } catch (IOException ioex) { + LOG.error("Can't instantiate bucket cache", ioex); + throw new RuntimeException(ioex); + } + } + LOG.info("Allocating LruBlockCache with maximum size " + + StringUtils.humanReadableInt(lruCacheSize)); + LruBlockCache lruCache = new LruBlockCache(lruCacheSize, + StoreFile.DEFAULT_BLOCKSIZE_SMALL); + lruCache.setVictimCache(bucketCache); + if (bucketCache != null && combinedWithLru) { + globalBlockCache = new CombinedBlockCache(lruCache, bucketCache); + } else { + globalBlockCache = lruCache; + } } else { - globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize, + globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf); } return globalBlockCache; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (revision 0) @@ -0,0 +1,120 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + + +import java.util.Comparator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +import com.google.common.collect.MinMaxPriorityQueue; + +/** + * A memory-bound queue that will grow until an element brings total size larger + * than maxSize. From then on, only entries that are sorted larger than the + * smallest current entry will be inserted/replaced. + * + *

+ * Use this when you want to find the largest elements (according to their + * ordering, not their heap size) that consume as close to the specified maxSize + * as possible. Default behavior is to grow just above rather than just below + * specified max. + */ +public class CachedEntryQueue { + + private MinMaxPriorityQueue> queue; + + private long cacheSize; + private long maxSize; + + /** + * @param maxSize the target size of elements in the queue + * @param blockSize expected average size of blocks + */ + public CachedEntryQueue(long maxSize, long blockSize) { + int initialSize = (int) (maxSize / blockSize); + if (initialSize == 0) + initialSize++; + queue = MinMaxPriorityQueue + .orderedBy(new Comparator>() { + public int compare(Entry entry1, + Entry entry2) { + return entry1.getValue().compareTo(entry2.getValue()); + } + + }).expectedSize(initialSize).create(); + cacheSize = 0; + this.maxSize = maxSize; + } + + /** + * Attempt to add the specified entry to this queue. + * + *

+ * If the queue is smaller than the max size, or if the specified element is + * ordered after the smallest element in the queue, the element will be added + * to the queue. Otherwise, there is no side effect of this call. + * @param entry a bucket entry with key to try to add to the queue + */ + public void add(Map.Entry entry) { + if (cacheSize < maxSize) { + queue.add(entry); + cacheSize += entry.getValue().getLength(); + } else { + BucketEntry head = queue.peek().getValue(); + if (entry.getValue().compareTo(head) > 0) { + cacheSize += entry.getValue().getLength(); + cacheSize -= head.getLength(); + if (cacheSize > maxSize) { + queue.poll(); + } else { + cacheSize += head.getLength(); + } + queue.add(entry); + } + } + } + + /** + * @return The next element in this queue, or {@code null} if the queue is + * empty. + */ + public Map.Entry poll() { + return queue.poll(); + } + + /** + * @return The last element in this queue, or {@code null} if the queue is + * empty. + */ + public Map.Entry pollLast() { + return queue.pollLast(); + } + + /** + * Total size of all elements in this queue. + * @return size of all elements currently in queue, in bytes + */ + public long cacheSize() { + return cacheSize; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java (revision 0) @@ -0,0 +1,108 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * IO engine that stores data to a file on the local file system. + */ +public class FileIOEngine implements IOEngine { + static final Log LOG = LogFactory.getLog(FileIOEngine.class); + + private FileChannel fileChannel = null; + + public FileIOEngine(String filePath, long fileSize) throws IOException { + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(filePath, "rw"); + raf.setLength(fileSize); + fileChannel = raf.getChannel(); + LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + + ", on the path:" + filePath); + } catch (java.io.FileNotFoundException fex) { + LOG.error("Can't create bucket cache file " + filePath, fex); + throw fex; + } catch (IOException ioex) { + LOG.error("Can't extend bucket cache file; insufficient space for " + + StringUtils.byteDesc(fileSize), ioex); + if (raf != null) raf.close(); + throw ioex; + } + } + + /** + * File IO engine is always able to support persistent storage for the cache + * @return true + */ + @Override + public boolean isPersistent() { + return true; + } + + /** + * Transfers data from file to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the file where the first byte to be read + * @throws IOException + */ + @Override + public void read(ByteBuffer dstBuffer, long offset) throws IOException { + fileChannel.read(dstBuffer, offset); + } + + /** + * Transfers data from the given byte buffer to file + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the file where the first byte to be written + * @throws IOException + */ + @Override + public void write(ByteBuffer srcBuffer, long offset) throws IOException { + fileChannel.write(srcBuffer, offset); + } + + /** + * Sync the data to file after writing + * @throws IOException + */ + @Override + public void sync() throws IOException { + fileChannel.force(true); + } + + /** + * Close the file + */ + @Override + public void shutdown() { + try { + fileChannel.close(); + } catch (IOException ex) { + LOG.error("Can't shutdown cleanly", ex); + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy) @@ -44,6 +44,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -173,6 +175,9 @@ /** Overhead of the structure itself */ private long overhead; + /** Where to send victims (blocks evicted from the cache) */ + private BucketCache victimHandler = null; + /** * Default constructor. Specify maximum size and expected average block * size (approximation is fine). @@ -342,6 +347,8 @@ CachedBlock cb = map.get(cacheKey); if(cb == null) { if (!repeat) stats.miss(caching); + if (victimHandler != null) + return victimHandler.getBlock(cacheKey, caching, repeat); return null; } stats.hit(caching); @@ -349,12 +356,20 @@ return cb.getBuffer(); } + /** + * Whether the cache contains block with specified cacheKey + * @param cacheKey + * @return true if contains the block + */ + public boolean containBlock(BlockCacheKey cacheKey) { + return map.containsKey(cacheKey); + } @Override public boolean evictBlock(BlockCacheKey cacheKey) { CachedBlock cb = map.get(cacheKey); if (cb == null) return false; - evictBlock(cb); + evictBlock(cb, false); return true; } @@ -377,14 +392,31 @@ ++numEvicted; } } + if (victimHandler != null) { + numEvicted += victimHandler.evictBlocksByHfileName(hfileName); + } return numEvicted; } - protected long evictBlock(CachedBlock block) { + /** + * Evict the block, and it will be cached by the victim handler if exists && + * block may be read again later + * @param block + * @param cachedByVictimHandler true if the given block is evicted by + * EvictionThread + * @return the heap of size of evicted block + */ + protected long evictBlock(CachedBlock block, boolean cachedByVictimHandler) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); elements.decrementAndGet(); stats.evicted(); + if (cachedByVictimHandler && victimHandler != null) { + boolean wait = getCurrentSize() < acceptableSize(); + boolean inMemory = block.getPriority() == BlockPriority.MEMORY; + victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), + inMemory, wait); + } return block.heapSize(); } @@ -512,7 +544,7 @@ CachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { - freedBytes += evictBlock(cb); + freedBytes += evictBlock(cb, true); if (freedBytes >= toFree) { return freedBytes; } @@ -693,7 +725,7 @@ } public final static long CACHE_FIXED_OVERHEAD = ClassSize.align( - (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) + + (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN + ClassSize.OBJECT); @@ -762,6 +794,8 @@ } public void shutdown() { + if (victimHandler != null) + victimHandler.shutdown(); this.scheduleThreadPool.shutdown(); for (int i = 0; i < 10; i++) { if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10); @@ -812,4 +846,9 @@ return counts; } + public void setVictimCache(BucketCache handler) { + assert victimHandler == null; + victimHandler = handler; + } + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (working copy) @@ -34,4 +34,21 @@ * @return T the deserialized object. */ public T deserialize(ByteBuffer b) throws IOException; + + /** + * + * @param b + * @param reuse true if Cacheable object can use the given buffer as its + * content + * @return T the deserialized object. + * @throws IOException + */ + public T deserialize(ByteBuffer b, boolean reuse) throws IOException; + + /** + * Get the identifier of this deserialiser. Identifier is unique for each + * deserializer and generated by {@link CacheableDeserializerIdManager} + * @return identifier number of this cacheable deserializer + */ + public int getDeserialiserIdentifier(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (revision 0) @@ -0,0 +1,548 @@ +/** + * Copyright 2012 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +/** + * This class is used to generate offset when allocating a block, and free the + * block when evicting. It manages an array of {@link BucketAllocator#Bucket}, + * each bucket is specified a size and caches elements up to this size. For + * completely empty bucket, this size could be re-specified dynamically. + * + * This class is not thread safe. + */ +public final class BucketAllocator { + static final Log LOG = LogFactory.getLog(BucketCache.class); + + final private static class Bucket { + private long mBaseOffset; + private int mSize, mSizeIndex; + private int mItemCount; + private int mFreeList[]; + private int mFreeCount, mUsedCount; + + public Bucket(long offset) { + mBaseOffset = offset; + mSizeIndex = -1; + } + + void reconfigure(int sizeIndex) { + mSizeIndex = sizeIndex; + mSize = BUCKET_SIZES[mSizeIndex]; + mItemCount = (int) (((long) BUCKET_CAPACITY) / (long) mSize); + mFreeCount = mItemCount; + mUsedCount = 0; + mFreeList = new int[mItemCount]; + for (int i = 0; i < mFreeCount; ++i) + mFreeList[i] = i; + } + + public boolean isUninstantiated() { + return mSizeIndex == -1; + } + + public int sizeIndex() { + return mSizeIndex; + } + + public int size() { + return mSize; + } + + public boolean hasFreeSpace() { + return mFreeCount > 0; + } + + public boolean isCompletelyFree() { + return mUsedCount == 0; + } + + public int freeCount() { + return mFreeCount; + } + + public int usedCount() { + return mUsedCount; + } + + public int freeBytes() { + return mFreeCount * mSize; + } + + public int usedBytes() { + return mUsedCount * mSize; + } + + public long baseOffset() { + return mBaseOffset; + } + + /** + * Allocate a block in this bucket, return the offset represents the + * position in physical space + * @return the offset in the IOEngine + */ + public long allocate() { + assert mFreeCount > 0; // Else should not have been called + assert mSizeIndex != -1; + ++mUsedCount; + long offset = mBaseOffset + (mFreeList[--mFreeCount] * mSize); + if (offset < 0) { + LOG.warn("Failed allocate,mBaseOffset=" + mBaseOffset + ",mFreeCount=" + + mFreeCount + ",mFreeList=" + mFreeList[mFreeCount] + ",mSize=" + + mSize); + } + return offset; + } + + public void addAllocation(long offset) throws BucketAllocatorException { + offset -= mBaseOffset; + if (offset < 0 || offset % mSize != 0) + throw new BucketAllocatorException( + "Attempt to add allocation for bad offset: " + offset + " base=" + + mBaseOffset); + int idx = (int) (offset / mSize); + boolean copyDown = false; + for (int i = 0; i < mFreeCount; ++i) { + if (copyDown) + mFreeList[i - 1] = mFreeList[i]; + else if (mFreeList[i] == idx) + copyDown = true; + } + if (!copyDown) + throw new BucketAllocatorException("Couldn't find match for index " + + idx + " in free list"); + ++mUsedCount; + --mFreeCount; + } + + private void free(long offset) { + offset -= mBaseOffset; + assert offset >= 0; + assert offset < mItemCount * mSize; + assert offset % mSize == 0; + assert mUsedCount > 0; + assert mFreeCount < mItemCount; // Else duplicate free + int item = (int) (offset / (long) mSize); + assert !freeListContains(item); + --mUsedCount; + mFreeList[mFreeCount++] = item; + } + + private boolean freeListContains(int blockNo) { + for (int i = 0; i < mFreeCount; ++i) { + if (mFreeList[i] == blockNo) return true; + } + return false; + } + } + + final class BucketSizeInfo { + private List mBuckets, mFreeBuckets, mCompletelyFreeBuckets; + private int mSizeIndex; + + BucketSizeInfo(int sizeIndex) { + mBuckets = new ArrayList(); + mFreeBuckets = new ArrayList(); + mCompletelyFreeBuckets = new ArrayList(); + mSizeIndex = sizeIndex; + } + + public void instantiateBucket(Bucket b) { + assert b.isUninstantiated() || b.isCompletelyFree(); + b.reconfigure(mSizeIndex); + mBuckets.add(b); + mFreeBuckets.add(b); + mCompletelyFreeBuckets.add(b); + } + + public int sizeIndex() { + return mSizeIndex; + } + + /** + * Find a bucket to allocate a block + * @return the offset in the IOEngine + */ + public long allocateBlock() { + Bucket b = null; + if (mFreeBuckets.size() > 0) // Use up an existing one first... + b = mFreeBuckets.get(mFreeBuckets.size() - 1); + if (b == null) { + b = grabGlobalCompletelyFreeBucket(); + if (b != null) + instantiateBucket(b); + } + if (b == null) + return -1; + long result = b.allocate(); + blockAllocated(b); + return result; + } + + void blockAllocated(Bucket b) { + if (!b.isCompletelyFree()) + mCompletelyFreeBuckets.remove(b); + if (!b.hasFreeSpace()) + mFreeBuckets.remove(b); + } + + public Bucket findAndRemoveCompletelyFreeBucket() { + Bucket b = null; + assert mBuckets.size() > 0; + if (mBuckets.size() == 1) { + // So we never get complete starvation of a bucket for a size + return null; + } + + if (mCompletelyFreeBuckets.size() > 0) { + b = mCompletelyFreeBuckets.get(0); + removeBucket(b); + } + return b; + } + + private void removeBucket(Bucket b) { + assert b.isCompletelyFree(); + mBuckets.remove(b); + mFreeBuckets.remove(b); + mCompletelyFreeBuckets.remove(b); + } + + public void freeBlock(Bucket b, long offset) { + assert mBuckets.contains(b); + assert (!mCompletelyFreeBuckets.contains(b)); // else we shouldn't have + // anything to free... + b.free(offset); + if (!mFreeBuckets.contains(b)) + mFreeBuckets.add(b); + if (b.isCompletelyFree()) + mCompletelyFreeBuckets.add(b); + } + + public IndexStatistics statistics() { + long free = 0, used = 0; + for (Bucket b : mBuckets) { + free += b.freeCount(); + used += b.usedCount(); + } + return new IndexStatistics(free, used, BUCKET_SIZES[mSizeIndex]); + } + } + + // Default block size is 64K, so we choose more sizes near 64K, you'd better + // reset it according to your cluster's block size distribution + // TODO Make these sizes configurable + // TODO Support the view of block size distribution statistics + private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, + 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, + 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, + 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, + 512 * 1024 + 1024 }; + + public BucketSizeInfo roundUpToBucketSizeInfo(int osz) { + for (int i = 0; i < BUCKET_SIZES.length; ++i) + if (osz <= BUCKET_SIZES[i]) + return mBucketsBySize[i]; + return null; + } + + public int roundUpToBucketSize(int osz) { + for (int i = 0; i < BUCKET_SIZES.length; ++i) + if (osz <= BUCKET_SIZES[i]) + return BUCKET_SIZES[i]; + return osz; + } + + static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead + static public final int LEAST_ITEMS_IN_BUCKET = 4; + // The capacity size for each bucket + static final long BUCKET_CAPACITY = LEAST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE; + + private Bucket mBuckets[]; + private BucketSizeInfo[] mBucketsBySize; + private final long totalSize; + private long usedSize = 0; + + BucketAllocator(long availableSpace) throws BucketAllocatorException { + mBuckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)]; + if (mBuckets.length < BUCKET_SIZES.length) + throw new BucketAllocatorException( + "Bucket allocator size too small - must have room for at least " + + BUCKET_SIZES.length + " buckets"); + mBucketsBySize = new BucketSizeInfo[BUCKET_SIZES.length]; + for (int i = 0; i < BUCKET_SIZES.length; ++i) { + BucketSizeInfo bsi = new BucketSizeInfo(i); + mBucketsBySize[i] = bsi; + } + for (int i = 0; i < mBuckets.length; ++i) { + mBuckets[i] = new Bucket(BUCKET_CAPACITY * i); + mBucketsBySize[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1] + .instantiateBucket(mBuckets[i]); + } + this.totalSize = ((long) mBuckets.length) * BUCKET_CAPACITY; + } + + /* + * Rebuild the allocator's data structures from a persisted map. + */ + BucketAllocator(long availableSpace, BucketCache fc, + Map map, AtomicLong realCacheSize) + throws BucketAllocatorException { + this(availableSpace); + + // each bucket has an offset, sizeindex. probably the buckets are too big + // in our default state. so what we do is reconfigure them according to what + // we've found. we can only reconfigure each bucket once; if more than once, + // we know there's a bug, so we just log the info, throw, and start again... + boolean[] reconfigured = new boolean[mBuckets.length]; + for (Map.Entry entry : map.entrySet()) { + long foundOff = entry.getValue().offset(); + int foundLen = entry.getValue().getLength(); + int needBucketSizeIndex = -1; + for (int i = 0; i < BUCKET_SIZES.length; ++i) { + if (foundLen <= BUCKET_SIZES[i]) { + needBucketSizeIndex = i; + break; + } + } + if (needBucketSizeIndex == -1) { + throw new BucketAllocatorException( + "Can't match bucket size for the block with size " + foundLen); + } + int foundBucketNo = (int) (foundOff / (long) BUCKET_CAPACITY); + if (foundBucketNo < 0 || foundBucketNo >= mBuckets.length) + throw new BucketAllocatorException("Can't find bucket " + foundBucketNo + + "; did you shrink the cache? Clearing allocator"); + Bucket b = mBuckets[foundBucketNo]; + if (reconfigured[foundBucketNo] == true) { + if (b.sizeIndex() != needBucketSizeIndex) + throw new BucketAllocatorException( + "Inconsistent allocation to bucket map; clearing allocator"); + } else { + if (!b.isCompletelyFree()) + throw new BucketAllocatorException("Reconfiguring bucket " + + foundBucketNo + " but it's already allocated; corrupt data"); + // Need to remove the bucket from whichever list it's currently in at + // the moment... + BucketSizeInfo bsi = mBucketsBySize[needBucketSizeIndex]; + BucketSizeInfo oldbsi = mBucketsBySize[b.sizeIndex()]; + oldbsi.removeBucket(b); + bsi.instantiateBucket(b); + reconfigured[foundBucketNo] = true; + } + realCacheSize.addAndGet(foundLen); + mBuckets[foundBucketNo].addAllocation(foundOff); + usedSize += mBuckets[foundBucketNo].size(); + mBucketsBySize[needBucketSizeIndex].blockAllocated(b); + } + } + + public String getInfo() { + StringBuilder sb = new StringBuilder(1024); + for (int i = 0; i < mBuckets.length; ++i) { + Bucket b = mBuckets[i]; + sb.append(" Bucket ").append(i).append(": ").append(b.size()); + sb.append(" freeCount=").append(b.freeCount()).append(" used=") + .append(b.usedCount()); + sb.append('\n'); + } + return sb.toString(); + } + + public long getUsedSize() { + return this.usedSize; + } + + public long getFreeSize() { + long freeSize = this.totalSize - getUsedSize(); + return freeSize; + } + + public long getTotalSize() { + return this.totalSize; + } + + /** + * Allocate a block with specified size. Return the offset + * @param osz size of given block + * @throws CacheFullException, BucketAllocatorException + * @return the offset in the IOEngine + */ + public synchronized long allocateBlock(int osz) throws CacheFullException, + BucketAllocatorException { + assert osz > 0; + BucketSizeInfo bsi = roundUpToBucketSizeInfo(osz); + if (bsi == null) { + throw new BucketAllocatorException("Allocation too big size=" + osz); + } + long offset = bsi.allocateBlock(); + + // Ask caller to free up space and try again! + if (offset < 0) + throw new CacheFullException(osz, bsi.sizeIndex()); + usedSize += BUCKET_SIZES[bsi.sizeIndex()]; + return offset; + } + + private Bucket grabGlobalCompletelyFreeBucket() { + for (BucketSizeInfo bsi : mBucketsBySize) { + Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); + if (b != null) + return b; + } + return null; + } + + public synchronized int freeBlock(long freeBlock) { + int bucketNo = (int) (freeBlock / (long) BUCKET_CAPACITY); + assert bucketNo >= 0 && bucketNo < mBuckets.length; + Bucket targetBucket = mBuckets[bucketNo]; + mBucketsBySize[targetBucket.sizeIndex()].freeBlock(targetBucket, freeBlock); + usedSize -= targetBucket.size(); + return targetBucket.size(); + } + + public int sizeIndexOfAllocation(long offset) { + int bucketNo = (int) (offset / (long) BUCKET_CAPACITY); + assert bucketNo >= 0 && bucketNo < mBuckets.length; + Bucket targetBucket = mBuckets[bucketNo]; + return targetBucket.sizeIndex(); + } + + public int sizeOfAllocation(long offset) { + int bucketNo = (int) (offset / (long) BUCKET_CAPACITY); + assert bucketNo >= 0 && bucketNo < mBuckets.length; + Bucket targetBucket = mBuckets[bucketNo]; + return targetBucket.size(); + } + + static public int getMaximumAllocationIndex() { + return BUCKET_SIZES.length; + } + + public static class IndexStatistics { + private long mFree, mUsed, mItemSize, mTotal; + + public long freeCount() { + return mFree; + } + + public long usedCount() { + return mUsed; + } + + public long totalCount() { + return mTotal; + } + + public long freeBytes() { + return mFree * mItemSize; + } + + public long usedBytes() { + return mUsed * mItemSize; + } + + public long totalBytes() { + return mTotal * mItemSize; + } + + public long itemSize() { + return mItemSize; + } + + public IndexStatistics(long free, long used, long itemSize) { + setTo(free, used, itemSize); + } + + public IndexStatistics() { + setTo(-1, -1, 0); + } + + public void setTo(long free, long used, long itemSize) { + mItemSize = itemSize; + mFree = free; + mUsed = used; + mTotal = free + used; + } + } + + public void dumpToLog() { + logStatistics(); + StringBuilder sb = new StringBuilder(); + for (Bucket b : mBuckets) { + sb.append("Bucket:").append(b.mBaseOffset).append('\n'); + sb.append(" Size index: " + b.sizeIndex() + "; Free:" + b.mFreeCount + + "; used:" + b.mUsedCount + "; freelist\n"); + for (int i = 0; i < b.freeCount(); ++i) + sb.append(b.mFreeList[i]).append(','); + } + LOG.info(sb); + } + + public void logStatistics() { + IndexStatistics total = new IndexStatistics(); + IndexStatistics[] stats = getIndexStatistics(total); + LOG.info("Bucket allocator statistics follow:\n"); + LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" + + total.usedBytes() + "; total bytes=" + total.totalBytes()); + for (IndexStatistics s : stats) { + LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() + + "; free=" + s.freeCount() + "; total=" + s.totalCount()); + } + } + + public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { + IndexStatistics[] stats = getIndexStatistics(); + long totalfree = 0, totalused = 0; + for (IndexStatistics stat : stats) { + totalfree += stat.freeBytes(); + totalused += stat.usedBytes(); + } + grandTotal.setTo(totalfree, totalused, 1); + return stats; + } + + public IndexStatistics[] getIndexStatistics() { + IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length]; + for (int i = 0; i < stats.length; ++i) + stats[i] = mBucketsBySize[i].statistics(); + return stats; + } + + public long freeBlock(long freeList[]) { + long sz = 0; + for (int i = 0; i < freeList.length; ++i) + sz += freeBlock(freeList[i]); + return sz; + } + +} Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (revision 0) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (revision 0) @@ -0,0 +1,193 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * This class manages an array of ByteBuffers with a default size 4MB. These + * buffers are sequential and could be considered as a large buffer.It supports + * reading/writing data from this large buffer with a position and offset + */ +public final class ByteBufferArray { + static final Log LOG = LogFactory.getLog(ByteBufferArray.class); + + static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; + private ByteBuffer buffers[]; + private Lock locks[]; + private int bufferSize; + private int bufferCount; + + /** + * We allocate a number of byte buffers as the capacity. In order not to out + * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), + * we will allocate one additional buffer with capacity 0; + * @param capacity total size of the byte buffer array + * @param directByteBuffer true if we allocate direct buffer + */ + public ByteBufferArray(long capacity, boolean directByteBuffer) { + this.bufferSize = DEFAULT_BUFFER_SIZE; + if (this.bufferSize > (capacity / 16)) + this.bufferSize = (int) roundUp(capacity / 16, 32768); + this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); + LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) + + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + + bufferCount); + buffers = new ByteBuffer[bufferCount + 1]; + locks = new Lock[bufferCount + 1]; + for (int i = 0; i <= bufferCount; i++) { + locks[i] = new ReentrantLock(); + if (i < bufferCount) { + buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize) + : ByteBuffer.allocate(bufferSize); + } else { + buffers[i] = ByteBuffer.allocate(0); + } + + } + } + + private long roundUp(long n, long to) { + return ((n + to - 1) / to) * to; + } + + /** + * Transfers bytes from this buffer array into the given destination array + * @param start start position in the ByteBufferArray + * @param len The maximum number of bytes to be written to the given array + * @param dstArray The array into which bytes are to be written + */ + public void getMultiple(long start, int len, byte[] dstArray) { + getMultiple(start, len, dstArray, 0); + } + + /** + * Transfers bytes from this buffer array into the given destination array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be written to the given array + * @param dstArray The array into which bytes are to be written + * @param dstOffset The offset within the given array of the first byte to be + * written + */ + public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) { + multiple(start, len, dstArray, dstOffset, new Visitor() { + public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { + bb.get(array, arrayIdx, len); + } + }); + } + + /** + * Transfers bytes from the given source array into this buffer array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be read from the given array + * @param srcArray The array from which bytes are to be read + */ + public void putMultiple(long start, int len, byte[] srcArray) { + putMultiple(start, len, srcArray, 0); + } + + /** + * Transfers bytes from the given source array into this buffer array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be read from the given array + * @param srcArray The array from which bytes are to be read + * @param srcOffset The offset within the given array of the first byte to be + * read + */ + public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) { + multiple(start, len, srcArray, srcOffset, new Visitor() { + public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { + bb.put(array, arrayIdx, len); + } + }); + } + + private interface Visitor { + /** + * Visit the given byte buffer, if it is a read action, we will transfer the + * bytes from the buffer to the destination array, else if it is a write + * action, we will transfer the bytes from the source array to the buffer + * @param bb byte buffer + * @param array a source or destination byte array + * @param arrayOffset offset of the byte array + * @param len read/write length + */ + void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len); + } + + /** + * Access(read or write) this buffer array with a position and length as the + * given array. Here we will only lock one buffer even if it may be need visit + * several buffers. The consistency is guaranteed by the caller. + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be accessed + * @param array The array from/to which bytes are to be read/written + * @param arrayOffset The offset within the given array of the first byte to + * be read or written + * @param visitor implement of how to visit the byte buffer + */ + void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) { + assert len >= 0; + long end = start + len; + int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize); + int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize); + assert array.length >= len + arrayOffset; + assert startBuffer >= 0 && startBuffer < bufferCount; + assert endBuffer >= 0 && endBuffer < bufferCount + || (endBuffer == bufferCount && endOffset == 0); + if (startBuffer >= locks.length || startBuffer < 0) { + String msg = "Failed multiple, start=" + start + ",startBuffer=" + + startBuffer + ",bufferSize=" + bufferSize; + LOG.error(msg); + throw new RuntimeException(msg); + } + int srcIndex = 0, cnt = -1; + for (int i = startBuffer; i <= endBuffer; ++i) { + Lock lock = locks[i]; + lock.lock(); + try { + ByteBuffer bb = buffers[i]; + if (i == startBuffer) { + cnt = bufferSize - startOffset; + if (cnt > len) cnt = len; + bb.limit(startOffset + cnt).position( + startOffset ); + } else if (i == endBuffer) { + cnt = endOffset; + bb.limit(cnt).position(0); + } else { + cnt = bufferSize ; + bb.limit(cnt).position(0); + } + visitor.visit(bb, array, srcIndex + arrayOffset, cnt); + srcIndex += cnt; + } finally { + lock.unlock(); + } + } + assert srcIndex == len; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1427781) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.MemStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; @@ -129,8 +130,8 @@ public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase( ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); - static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG + - Bytes.SIZEOF_INT; + public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_INT + + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; /** * Each checksum value is an integer that can be stored in 4 bytes. @@ -139,22 +140,39 @@ private static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { - public HFileBlock deserialize(ByteBuffer buf) throws IOException{ - ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit() - - HFileBlock.EXTRA_SERIALIZATION_SPACE); - buf.limit(buf.limit() - - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); - newByteBuffer.put(buf); - HFileBlock ourBuffer = new HFileBlock(newByteBuffer, - MINOR_VERSION_NO_CHECKSUM); - + public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ + buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); + ByteBuffer newByteBuffer; + if(reuse){ + newByteBuffer = buf.slice(); + }else{ + newByteBuffer = ByteBuffer.allocate(buf.limit()); + newByteBuffer.put(buf); + } buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); + int minorVersion=buf.getInt(); + HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion); ourBuffer.offset = buf.getLong(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); return ourBuffer; } + + @Override + public int getDeserialiserIdentifier() { + return deserializerIdentifier; + } + + @Override + public HFileBlock deserialize(ByteBuffer b) throws IOException { + return deserialize(b, false); + } }; + private static final int deserializerIdentifier; + static { + deserializerIdentifier = CacheableDeserializerIdManager + .registerDeserializer(blockDeserializer); + } private BlockType blockType; @@ -359,6 +377,17 @@ } /** + * Returns the buffer of this block, including header data. The clients must + * not modify the buffer object. This method has to be public because it is + * used in {@link BucketCache} to avoid buffer copy. + * + * @return the byte buffer with header included for read-only operations + */ + public ByteBuffer getBufferReadOnlyWithHeader() { + return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); + } + + /** * Returns a byte buffer of this block, including header data, positioned at * the beginning of header. The underlying data array is not copied. * @@ -1780,12 +1809,22 @@ @Override public void serialize(ByteBuffer destination) { - destination.put(this.buf.duplicate()); + ByteBuffer dupBuf = this.buf.duplicate(); + dupBuf.rewind(); + destination.put(dupBuf); + destination.putInt(this.minorVersion); destination.putLong(this.offset); destination.putInt(this.nextBlockOnDiskSizeWithHeader); destination.rewind(); } + public void serializeExtraInfo(ByteBuffer destination) { + destination.putInt(this.minorVersion); + destination.putLong(this.offset); + destination.putInt(this.nextBlockOnDiskSizeWithHeader); + destination.rewind(); + } + @Override public CacheableDeserializer getDeserializer() { return HFileBlock.blockDeserializer; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Map from type T to int and vice-versa. Used for reducing bit field item + * counts. + */ +public final class UniqueIndexMap implements Serializable { + private static final long serialVersionUID = -1145635738654002342L; + + ConcurrentHashMap mForwardMap = new ConcurrentHashMap(); + ConcurrentHashMap mReverseMap = new ConcurrentHashMap(); + AtomicInteger mIndex = new AtomicInteger(0); + + // Map a length to an index. If we can't, allocate a new mapping. We might + // race here and get two entries with the same deserialiser. This is fine. + int map(T parameter) { + Integer ret = mForwardMap.get(parameter); + if (ret != null) return ret.intValue(); + int nexti = mIndex.incrementAndGet(); + assert (nexti < Short.MAX_VALUE); + mForwardMap.put(parameter, nexti); + mReverseMap.put(nexti, parameter); + return nexti; + } + + T unmap(int leni) { + Integer len = Integer.valueOf(leni); + assert mReverseMap.containsKey(len); + return mReverseMap.get(len); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java (revision 0) @@ -0,0 +1,72 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Basic test for {@link ByteBufferIOEngine} + */ +@Category(SmallTests.class) +public class TestByteBufferIOEngine { + + @Test + public void testByteBufferIOEngine() throws Exception { + int capacity = 32 * 1024 * 1024; // 32 MB + int testNum = 100; + int maxBlockSize = 64 * 1024; + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, false); + int testOffsetAtStartNum = testNum / 10; + int testOffsetAtEndNum = testNum / 10; + for (int i = 0; i < testNum; i++) { + byte val = (byte) (Math.random() * 255); + int blockSize = (int) (Math.random() * maxBlockSize); + byte[] byteArray = new byte[blockSize]; + for (int j = 0; j < byteArray.length; ++j) { + byteArray[j] = val; + } + ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray); + int offset = 0; + if (testOffsetAtStartNum > 0) { + testOffsetAtStartNum--; + offset = 0; + } else if (testOffsetAtEndNum > 0) { + testOffsetAtEndNum--; + offset = capacity - blockSize; + } else { + offset = (int) (Math.random() * (capacity - maxBlockSize)); + } + ioEngine.write(srcBuffer, offset); + ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize); + ioEngine.read(dstBuffer, offset); + byte[] byteArray2 = dstBuffer.array(); + for (int j = 0; j < byteArray.length; ++j) { + assertTrue(byteArray[j] == byteArray2[j]); + } + } + assert testOffsetAtStartNum == 0; + assert testOffsetAtEndNum == 0; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 0) @@ -0,0 +1,1178 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * BucketCache uses {@link BucketAllocator} to allocate/free block, and use + * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to + * determine whether a given element hit. It could uses memory + * {@link ByteBufferIOEngine} or file {@link FileIOEngine}to store/read the + * block data. + * + * Eviction is using similar algorithm as + * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} + * + * BucketCache could be used as mainly a block cache(see + * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS and + * fragment by GC. + * + * Also could be used as a secondary cache(e.g. using Fusionio to store block) + * to enlarge cache space by + * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} + */ +public class BucketCache implements BlockCache, HeapSize { + static final Log LOG = LogFactory.getLog(BucketCache.class); + + /** Priority buckets */ + private static final float DEFAULT_SINGLE_FACTOR = 0.25f; + private static final float DEFAULT_MULTI_FACTOR = 0.50f; + private static final float DEFAULT_MEMORY_FACTOR = 0.25f; + private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; + + private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; + private static final float DEFAULT_MIN_FACTOR = 0.85f; + + /** Statistics thread */ + private static final int statThreadPeriod = 3 * 60; + + final static int DEFAULT_WRITER_THREADS = 3; + final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + + // Store/read block data + IOEngine ioEngine; + + // Store the block in this map before writing it to cache + private ConcurrentHashMap ramCache; + // In this map, store the block's meta data like offset, length + private ConcurrentHashMap backingMap; + + /** + * Flag if the cache is enabled or not... We shut it off if there are IO + * errors for some time, so that Bucket IO exceptions/errors don't bring down + * the HBase server. + */ + private volatile boolean cacheEnabled; + + private ArrayList> writerQueues = + new ArrayList>(); + WriterThread writerThreads[]; + + + + /** Volatile boolean to track if free space is in process or not */ + private volatile boolean freeInProgress = false; + private Lock freeSpaceLock = new ReentrantLock(); + + private UniqueIndexMap deserialiserMap = new UniqueIndexMap(); + + private final AtomicLong realCacheSize = new AtomicLong(0); + private final AtomicLong heapSize = new AtomicLong(0); + /** Current number of cached elements */ + private final AtomicLong blockNumber = new AtomicLong(0); + private final AtomicLong failedBlockAdditions = new AtomicLong(0); + + /** Cache access count (sequential ID) */ + private final AtomicLong accessCount = new AtomicLong(0); + + private final Object[] cacheWaitSignals; + private static final int DEFAULT_CACHE_WAIT_TIME = 50; + // Used in test now. If the flag is false and the cache speed is very fast, + // bucket cache will skip some blocks when caching. If the flag is true, we + // will wait blocks flushed to IOEngine for some time when caching + boolean wait_when_cache = false; + + private BucketCacheStats cacheStats = new BucketCacheStats(); + + private String persistencePath; + private long cacheCapacity; + /** Approximate block size */ + private final long blockSize; + + /** Duration of IO errors tolerated before we disable cache, 1 min as default */ + private final int ioErrorsTolerationDuration; + // 1 min + public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; + // Start time of first IO error when reading or writing IO Engine, it will be + // reset after a successful read/write. + private volatile long ioErrorStartTime = -1; + + /** + * A "sparse lock" implementation allowing to lock on a particular block + * identified by offset. The purpose of this is to avoid freeing the block + * which is being read. + * + * TODO:We could extend the IdLock to IdReadWriteLock for better. + */ + private IdLock offsetLock = new IdLock(); + + + + /** Statistics thread schedule pool (for heavy debugging, could remove) */ + private final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("BucketCache Statistics #%d") + .setDaemon(true) + .build()); + + // Allocate or free space for the block + private BucketAllocator bucketAllocator; + + public BucketCache(String ioEngineName, long capacity, int writerThreadNum, + int writerQLen, String persistencePath) throws FileNotFoundException, + IOException { + this(ioEngineName, capacity, writerThreadNum, writerQLen, persistencePath, + DEFAULT_ERROR_TOLERATION_DURATION); + } + + public BucketCache(String ioEngineName, long capacity, int writerThreadNum, + int writerQLen, String persistencePath, int ioErrorsTolerationDuration) + throws FileNotFoundException, IOException { + this.ioEngine = getIOEngineFromName(ioEngineName, capacity); + this.writerThreads = new WriterThread[writerThreadNum]; + this.cacheWaitSignals = new Object[writerThreadNum]; + long blockNumCapacity = capacity / 16384; + if (blockNumCapacity >= Integer.MAX_VALUE) { + // Enough for about 32TB of cache! + throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); + } + + this.cacheCapacity = capacity; + this.persistencePath = persistencePath; + this.blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL; + this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; + + bucketAllocator = new BucketAllocator(capacity); + for (int i = 0; i < writerThreads.length; ++i) { + writerQueues.add(new ArrayBlockingQueue(writerQLen)); + this.cacheWaitSignals[i] = new Object(); + } + + assert writerQueues.size() == writerThreads.length; + this.ramCache = new ConcurrentHashMap(); + + this.backingMap = new ConcurrentHashMap((int) blockNumCapacity); + + if (ioEngine.isPersistent() && persistencePath != null) { + try { + retrieveFromFile(); + } catch (IOException ioex) { + LOG.error("Can't restore from file because of", ioex); + } catch (ClassNotFoundException cnfe) { + LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); + throw new RuntimeException(cnfe); + } + } + final String threadName = Thread.currentThread().getName(); + for (int i = 0; i < writerThreads.length; ++i) { + writerThreads[i] = new WriterThread(writerQueues.get(i), i); + writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + writerThreads[i].start(); + } + this.cacheEnabled = true; + // Run the statistics thread periodically to print the cache statistics log + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), + statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + LOG.info("Started bucket cache"); + } + + /** + * Get the IOEngine from the IO engine name + * @param ioEngineName + * @param capacity + * @return + * @throws IOException + */ + private IOEngine getIOEngineFromName(String ioEngineName, long capacity) + throws IOException { + if (ioEngineName.startsWith("file:")) + return new FileIOEngine(ioEngineName.substring(5), capacity); + else if (ioEngineName.startsWith("offheap")) + return new ByteBufferIOEngine(capacity, true); + else if (ioEngineName.startsWith("heap")) + return new ByteBufferIOEngine(capacity, false); + else + throw new IllegalArgumentException( + "Don't understand io engine name for cache - prefix with file:, heap or offheap"); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param buf block buffer + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + cacheBlock(cacheKey, buf, false); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { + cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); + } + + /** + * Cache the block to ramCache + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + * @param wait if true, blocking wait when queue is full + */ + public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, + boolean inMemory, boolean wait) { + if (!cacheEnabled) + return; + + if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) + return; + + /* + * Stuff the entry into the RAM cache so it can get drained to the + * persistent store + */ + RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, + accessCount.incrementAndGet(), inMemory); + ramCache.put(cacheKey, re); + int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); + BlockingQueue bq = writerQueues.get(queueNum); + boolean successfulAddition = bq.offer(re); + if (!successfulAddition && wait) { + synchronized (cacheWaitSignals[queueNum]) { + try { + cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + successfulAddition = bq.offer(re); + } + if (!successfulAddition) { + ramCache.remove(cacheKey); + failedBlockAdditions.incrementAndGet(); + } else { + this.blockNumber.incrementAndGet(); + this.heapSize.addAndGet(cachedItem.heapSize()); + } + } + + /** + * Get the buffer of the block with the specified key. + * @param key block's cache key + * @param caching true if the caller caches blocks on cache misses + * @param repeat Whether this is a repeat lookup for the same block + * @return buffer of specified cache key, or null if not in cache + */ + @Override + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + if (!cacheEnabled) + return null; + RAMQueueEntry re = ramCache.get(key); + if (re != null) { + cacheStats.hit(caching); + re.access(accessCount.incrementAndGet()); + return re.getData(); + } + BucketEntry bucketEntry = backingMap.get(key); + if(bucketEntry!=null) { + long start = System.nanoTime(); + IdLock.Entry lockEntry = null; + try { + lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + if (bucketEntry.equals(backingMap.get(key))) { + int len = bucketEntry.getLength(); + ByteBuffer bb = ByteBuffer.allocate(len); + ioEngine.read(bb, bucketEntry.offset()); + Cacheable cachedBlock = bucketEntry.deserializerReference( + deserialiserMap).deserialize(bb, true); + long timeTaken = System.nanoTime() - start; + cacheStats.hit(caching); + cacheStats.ioHit(timeTaken); + bucketEntry.access(accessCount.incrementAndGet()); + if (this.ioErrorStartTime > 0) { + ioErrorStartTime = -1; + } + return cachedBlock; + } + } catch (IOException ioex) { + LOG.error("Failed reading block " + key + " from bucket cache", ioex); + checkIOErrorIsTolerated(); + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + } + if(!repeat)cacheStats.miss(caching); + return null; + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + if (!cacheEnabled) return false; + RAMQueueEntry removedBlock = ramCache.remove(cacheKey); + if (removedBlock != null) { + this.blockNumber.decrementAndGet(); + this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); + } + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry != null) { + IdLock.Entry lockEntry = null; + try { + lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + if (bucketEntry.equals(backingMap.remove(cacheKey))) { + bucketAllocator.freeBlock(bucketEntry.offset()); + realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + if (removedBlock == null) { + this.blockNumber.decrementAndGet(); + } + } else { + return false; + } + } catch (IOException ie) { + LOG.warn("Failed evicting block " + cacheKey); + return false; + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + } + cacheStats.evicted(); + return true; + } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + private static class StatisticsThread extends Thread { + BucketCache bucketCache; + + public StatisticsThread(BucketCache bucketCache) { + super("BucketCache.StatisticsThread"); + setDaemon(true); + this.bucketCache = bucketCache; + } + @Override + public void run() { + bucketCache.logStats(); + } + } + + public void logStats() { + if (!LOG.isDebugEnabled()) return; + // Log size + long totalSize = bucketAllocator.getTotalSize(); + long usedSize = bucketAllocator.getUsedSize(); + long freeSize = totalSize - usedSize; + long cacheSize = this.realCacheSize.get(); + LOG.debug("BucketCache Stats: " + + "failedBlockAdditions=" + this.failedBlockAdditions.get() + ", " + + "total=" + StringUtils.byteDesc(totalSize) + ", " + + "free=" + StringUtils.byteDesc(freeSize) + ", " + + "usedSize=" + StringUtils.byteDesc(usedSize) +", " + + "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + + "accesses=" + cacheStats.getRequestCount() + ", " + + "hits=" + cacheStats.getHitCount() + ", " + + "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + + "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + + "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : + (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + + "cachingHits=" + cacheStats.getHitCachingCount() + ", " + + "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : + (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + + "evictions=" + cacheStats.getEvictionCount() + ", " + + "evicted=" + cacheStats.getEvictedCount() + ", " + + "evictedPerRun=" + cacheStats.evictedPerEviction()); + cacheStats.reset(); + } + + private long acceptableSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR); + } + + private long minSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MIN_FACTOR); + } + + private long singleSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() + * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR); + } + + private long multiSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR + * DEFAULT_MIN_FACTOR); + } + + private long memorySize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR + * DEFAULT_MIN_FACTOR); + } + + /** + * Free the space if the used size reaches acceptableSize() or one size block + * couldn't be allocated. When freeing the space, we use the LRU algorithm and + * ensure there must be some blocks evicted + */ + private void freeSpace() { + // Ensure only one freeSpace progress at a time + if (!freeSpaceLock.tryLock()) return; + try { + freeInProgress = true; + long bytesToFreeWithoutExtra = 0; + /* + * Calculate free byte for each bucketSizeinfo + */ + StringBuffer msgBuffer = new StringBuffer(); + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); + long[] bytesToFreeForBucket = new long[stats.length]; + for (int i = 0; i < stats.length; i++) { + bytesToFreeForBucket[i] = 0; + long freeGoal = (long) Math.floor(stats[i].totalCount() + * (1 - DEFAULT_MIN_FACTOR)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + bytesToFreeForBucket[i] = stats[i].itemSize() + * (freeGoal - stats[i].freeCount()); + bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } + } + msgBuffer.append("Free for total=" + + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); + + if (bytesToFreeWithoutExtra <= 0) { + return; + } + long currentSize = bucketAllocator.getUsedSize(); + long totalSize=bucketAllocator.getTotalSize(); + LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString() + + " of current used=" + StringUtils.byteDesc(currentSize) + + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get()) + + ",total=" + StringUtils.byteDesc(totalSize)); + + long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra + * (1 + DEFAULT_EXTRA_FREE_FACTOR)); + + // Instantiate priority buckets + BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, + blockSize, singleSize()); + BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, + blockSize, multiSize()); + BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, + blockSize, memorySize()); + + // Scan entire map putting bucket entry into appropriate bucket entry + // group + for (Map.Entry bucketEntryWithKey : backingMap.entrySet()) { + switch (bucketEntryWithKey.getValue().getPriority()) { + case SINGLE: { + bucketSingle.add(bucketEntryWithKey); + break; + } + case MULTI: { + bucketMulti.add(bucketEntryWithKey); + break; + } + case MEMORY: { + bucketMemory.add(bucketEntryWithKey); + break; + } + } + } + + PriorityQueue bucketQueue = new PriorityQueue(3); + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); + + int remainingBuckets = 3; + long bytesFreed = 0; + + BucketEntryGroup bucketGroup; + while ((bucketGroup = bucketQueue.poll()) != null) { + long overflow = bucketGroup.overflow(); + if (overflow > 0) { + long bucketBytesToFree = Math.min(overflow, + (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); + bytesFreed += bucketGroup.free(bucketBytesToFree); + } + remainingBuckets--; + } + + /** + * Check whether need extra free because some bucketSizeinfo still needs + * free space + */ + stats = bucketAllocator.getIndexStatistics(); + boolean needFreeForExtra = false; + for (int i = 0; i < stats.length; i++) { + long freeGoal = (long) Math.floor(stats[i].totalCount() + * (1 - DEFAULT_MIN_FACTOR)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + needFreeForExtra = true; + break; + } + } + + if (needFreeForExtra) { + bucketQueue.clear(); + remainingBuckets = 2; + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + + while ((bucketGroup = bucketQueue.poll()) != null) { + long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) + / remainingBuckets; + bytesFreed += bucketGroup.free(bucketBytesToFree); + remainingBuckets--; + } + } + + if (LOG.isDebugEnabled()) { + long single = bucketSingle.totalSize(); + long multi = bucketMulti.totalSize(); + long memory = bucketMemory.totalSize(); + LOG.debug("Bucket cache free space completed; " + "freed=" + + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + + StringUtils.byteDesc(totalSize) + ", " + "single=" + + StringUtils.byteDesc(single) + ", " + "multi=" + + StringUtils.byteDesc(multi) + ", " + "memory=" + + StringUtils.byteDesc(memory)); + } + + } finally { + cacheStats.evict(); + freeInProgress = false; + freeSpaceLock.unlock(); + } + } + + // This handles flushing the RAM cache to IOEngine. + private class WriterThread extends HasThread { + BlockingQueue inputQueue; + final int threadNO; + boolean writerEnabled = true; + + WriterThread(BlockingQueue queue, int threadNO) { + super(); + this.inputQueue = queue; + this.threadNO = threadNO; + setDaemon(true); + } + + // Used for test + void disableWriter() { + this.writerEnabled = false; + } + + public void run() { + List entries = new ArrayList(); + try { + while (cacheEnabled && writerEnabled) { + try { + // Blocks + entries.add(inputQueue.take()); + inputQueue.drainTo(entries); + synchronized (cacheWaitSignals[threadNO]) { + cacheWaitSignals[threadNO].notifyAll(); + } + } catch (InterruptedException ie) { + if (!cacheEnabled) break; + } + doDrain(entries); + } + } catch (Throwable t) { + LOG.warn("Failed doing drain", t); + } + LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); + } + + /** + * Flush the entries in ramCache to IOEngine and add bucket entry to + * backingMap + * @param entries + * @throws InterruptedException + */ + private void doDrain(List entries) + throws InterruptedException { + BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; + RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; + int done = 0; + while (entries.size() > 0 && cacheEnabled) { + // Keep going in case we throw... + RAMQueueEntry ramEntry = null; + try { + ramEntry = entries.remove(entries.size() - 1); + if (ramEntry == null) { + LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); + continue; + } + BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, + bucketAllocator, deserialiserMap, realCacheSize); + ramEntries[done] = ramEntry; + bucketEntries[done++] = bucketEntry; + if (ioErrorStartTime > 0) { + ioErrorStartTime = -1; + } + } catch (BucketAllocatorException fle) { + LOG.warn("Failed allocating for block " + + (ramEntry == null ? "" : ramEntry.getKey()), fle); + } catch (CacheFullException cfe) { + if (!freeInProgress) { + freeSpace(); + } else { + Thread.sleep(50); + } + } catch (IOException ioex) { + LOG.error("Failed writing to bucket cache", ioex); + checkIOErrorIsTolerated(); + } + } + + // Make sure that the data pages we have written are on the media before + // we update the map. + try { + ioEngine.sync(); + } catch (IOException ioex) { + LOG.error("Faild syncing IO engine", ioex); + checkIOErrorIsTolerated(); + // Since we failed sync, free the blocks in bucket allocator + for (int i = 0; i < done; ++i) { + if (bucketEntries[i] != null) { + bucketAllocator.freeBlock(bucketEntries[i].offset()); + } + } + done = 0; + } + + for (int i = 0; i < done; ++i) { + if (bucketEntries[i] != null) { + backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); + } + RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); + if (ramCacheEntry != null) { + heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize()); + } + } + + if (bucketAllocator.getUsedSize() > acceptableSize()) { + freeSpace(); + } + } + } + + + + private void persistToFile() throws IOException { + assert !cacheEnabled; + FileOutputStream fos = null; + ObjectOutputStream oos = null; + try { + if (!ioEngine.isPersistent()) + throw new IOException( + "Attempt to persist non-persistent cache mappings!"); + fos = new FileOutputStream(persistencePath, false); + oos = new ObjectOutputStream(fos); + oos.writeLong(cacheCapacity); + oos.writeUTF(ioEngine.getClass().getName()); + oos.writeUTF(backingMap.getClass().getName()); + oos.writeObject(deserialiserMap); + oos.writeObject(backingMap); + } finally { + if (oos != null) oos.close(); + if (fos != null) fos.close(); + } + } + + @SuppressWarnings("unchecked") + private void retrieveFromFile() throws IOException, BucketAllocatorException, + ClassNotFoundException { + File persistenceFile = new File(persistencePath); + if (!persistenceFile.exists()) { + return; + } + assert !cacheEnabled; + FileInputStream fis = null; + ObjectInputStream ois = null; + try { + if (!ioEngine.isPersistent()) + throw new IOException( + "Attempt to restore non-persistent cache mappings!"); + fis = new FileInputStream(persistencePath); + ois = new ObjectInputStream(fis); + long capacitySize = ois.readLong(); + if (capacitySize != cacheCapacity) + throw new IOException("Mismatched cache capacity:" + + StringUtils.byteDesc(capacitySize) + ", expected: " + + StringUtils.byteDesc(cacheCapacity)); + String ioclass = ois.readUTF(); + String mapclass = ois.readUTF(); + if (!ioEngine.getClass().getName().equals(ioclass)) + throw new IOException("Class name for IO engine mismatch: " + ioclass + + ", expected:" + ioEngine.getClass().getName()); + if (!backingMap.getClass().getName().equals(mapclass)) + throw new IOException("Class name for cache map mismatch: " + mapclass + + ", expected:" + backingMap.getClass().getName()); + UniqueIndexMap deserMap = (UniqueIndexMap) ois + .readObject(); + BucketAllocator allocator = new BucketAllocator(cacheCapacity, this, + backingMap, this.realCacheSize); + backingMap = (ConcurrentHashMap) ois + .readObject(); + bucketAllocator = allocator; + deserialiserMap = deserMap; + } finally { + if (ois != null) ois.close(); + if (fis != null) fis.close(); + if (!persistenceFile.delete()) { + throw new IOException("Failed deleting persistence file " + + persistenceFile.getAbsolutePath()); + } + } + } + + /** + * Check whether we tolerate IO error this time. If the duration of IOEngine + * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the + * cache + */ + private void checkIOErrorIsTolerated() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (this.ioErrorStartTime > 0) { + if (cacheEnabled + && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { + LOG.error("IO errors duration time has exceeded " + + ioErrorsTolerationDuration + + "ms, disabing cache, please check your IOEngine"); + disableCache(); + } + } else { + this.ioErrorStartTime = now; + } + } + + /** + * Used to shut down the cache -or- turn it off in the case of something + * broken. + */ + private void disableCache() { + if (!cacheEnabled) + return; + cacheEnabled = false; + ioEngine.shutdown(); + this.scheduleThreadPool.shutdown(); + for (int i = 0; i < writerThreads.length; ++i) + writerThreads[i].interrupt(); + this.ramCache.clear(); + if (!ioEngine.isPersistent() || persistencePath == null) { + this.backingMap.clear(); + } + } + + private void join() throws InterruptedException { + for (int i = 0; i < writerThreads.length; ++i) + writerThreads[i].join(); + } + + @Override + public void shutdown() { + disableCache(); + LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + + "; path to write=" + persistencePath); + if (ioEngine.isPersistent() && persistencePath != null) { + try { + join(); + persistToFile(); + } catch (IOException ex) { + LOG.error("Unable to persist data on exit: " + ex.toString(), ex); + } catch (InterruptedException e) { + LOG.warn("Failed to persist data on exit", e); + } + } + } + + @Override + public CacheStats getStats() { + return cacheStats; + } + + BucketAllocator getAllocator() { + return this.bucketAllocator; + } + + public long heapSize() { + return this.heapSize.get(); + } + + /** + * Returns the total size of the block cache, in bytes. + * @return size of cache, in bytes + */ + @Override + public long size() { + return this.realCacheSize.get(); + } + + @Override + public long getFreeSize() { + return this.bucketAllocator.getFreeSize(); + } + + @Override + public long getBlockCount() { + return this.blockNumber.get(); + } + + /** + * Returns the occupied size of the block cache, in bytes. + * @return occupied space in cache, in bytes + */ + @Override + public long getCurrentSize() { + return this.bucketAllocator.getUsedSize(); + } + + @Override + public long getEvictedCount() { + return cacheStats.getEvictedCount(); + } + + /** + * Evicts all blocks for a specific HFile. This is an expensive operation + * implemented as a linear-time search through all blocks in the cache. + * Ideally this should be a search in a log-access-time map. + * + *

+ * This is used for evict-on-close to remove all blocks of a specific HFile. + * + * @return the number of blocks evicted + */ + @Override + public int evictBlocksByHfileName(String hfileName) { + int numEvicted = 0; + for (BlockCacheKey key : this.backingMap.keySet()) { + if (key.getHfileName().equals(hfileName)) { + if (evictBlock(key)) + ++numEvicted; + } + } + return numEvicted; + } + + + @Override + public List getBlockCacheColumnFamilySummaries( + Configuration conf) { + throw new UnsupportedOperationException(); + } + + static enum BlockPriority { + /** + * Accessed a single time (used for scan-resistance) + */ + SINGLE, + /** + * Accessed multiple times + */ + MULTI, + /** + * Block from in-memory store + */ + MEMORY + }; + + /** + * Item in cache. We expect this to be where most memory goes. Java uses 8 + * bytes just for object headers; after this, we want to use as little as + * possible - so we only use 8 bytes, but in order to do so we end up messing + * around with all this Java casting stuff. Offset stored as 5 bytes that make + * up the long. Doubt we'll see devices this big for ages. Offsets are divided + * by 256. So 5 bytes gives us 256TB or so. + */ + static class BucketEntry implements Serializable, Comparable { + private static final long serialVersionUID = -6741504807982257534L; + private int offsetBase; + private int length; + private byte offset1; + byte deserialiserIndex; + private volatile long accessTime; + private BlockPriority priority; + + BucketEntry(long offset, int length, long accessTime, boolean inMemory) { + setOffset(offset); + this.length = length; + this.accessTime = accessTime; + if (inMemory) { + this.priority = BlockPriority.MEMORY; + } else { + this.priority = BlockPriority.SINGLE; + } + } + + long offset() { // Java has no unsigned numbers + long o = ((long) offsetBase) & 0xFFFFFFFF; + o += (((long) (offset1)) & 0xFF) << 32; + return o << 8; + } + + private void setOffset(long value) { + assert (value & 0xFF) == 0; + value >>= 8; + offsetBase = (int) value; + offset1 = (byte) (value >> 32); + } + + public int getLength() { + return length; + } + + protected CacheableDeserializer deserializerReference( + UniqueIndexMap deserialiserMap) { + return CacheableDeserializerIdManager.getDeserializer(deserialiserMap + .unmap(deserialiserIndex)); + } + + protected void setDeserialiserReference( + CacheableDeserializer deserializer, + UniqueIndexMap deserialiserMap) { + this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer + .getDeserialiserIdentifier())); + } + + /** + * Block has been accessed. Update its local access time. + */ + public void access(long accessTime) { + this.accessTime = accessTime; + if (this.priority == BlockPriority.SINGLE) { + this.priority = BlockPriority.MULTI; + } + } + + public BlockPriority getPriority() { + return this.priority; + } + + @Override + public int compareTo(BucketEntry that) { + if(this.accessTime == that.accessTime) return 0; + return this.accessTime < that.accessTime ? 1 : -1; + } + + @Override + public boolean equals(Object that) { + return this == that; + } + } + + /** + * Used to group bucket entries into priority buckets. There will be a + * EntryBucket for each priority (single, multi, memory). Once bucketed, the + * eviction algorithm takes the appropriate number of elements out of each + * according to configuration parameters and their relative sizes. + */ + private class BucketEntryGroup implements Comparable { + + private CachedEntryQueue queue; + private long totalSize = 0; + private long bucketSize; + + public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { + this.bucketSize = bucketSize; + queue = new CachedEntryQueue(bytesToFree, blockSize); + totalSize = 0; + } + + public void add(Map.Entry block) { + totalSize += block.getValue().getLength(); + queue.add(block); + } + + public long free(long toFree) { + Map.Entry entry; + long freedBytes = 0; + while ((entry = queue.pollLast()) != null) { + evictBlock(entry.getKey()); + freedBytes += entry.getValue().getLength(); + if (freedBytes >= toFree) { + return freedBytes; + } + } + return freedBytes; + } + + public long overflow() { + return totalSize - bucketSize; + } + + public long totalSize() { + return totalSize; + } + + @Override + public int compareTo(BucketEntryGroup that) { + if (this.overflow() == that.overflow()) + return 0; + return this.overflow() > that.overflow() ? 1 : -1; + } + + @Override + public boolean equals(Object that) { + return this == that; + } + + } + + /** + * Block Entry stored in the memory with key,data and so on + */ + private static class RAMQueueEntry { + private BlockCacheKey key; + private Cacheable data; + private long accessTime; + private boolean inMemory; + + public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime, + boolean inMemory) { + this.key = bck; + this.data = data; + this.accessTime = accessTime; + this.inMemory = inMemory; + } + + public Cacheable getData() { + return data; + } + + public BlockCacheKey getKey() { + return key; + } + + public void access(long accessTime) { + this.accessTime = accessTime; + } + + public BucketEntry writeToCache(final IOEngine ioEngine, + final BucketAllocator bucketAllocator, + final UniqueIndexMap deserialiserMap, + final AtomicLong realCacheSize) throws CacheFullException, IOException, + BucketAllocatorException { + int len = data.getSerializedLength(); + // This cacheable thing can't be serialized... + if (len == 0) return null; + long offset = bucketAllocator.allocateBlock(len); + BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, + inMemory); + bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); + try { + if (data instanceof HFileBlock) { + ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader(); + sliceBuf.rewind(); + assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); + ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer); + ioEngine.write(sliceBuf, offset); + ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); + } else { + ByteBuffer bb = ByteBuffer.allocate(len); + data.serialize(bb); + ioEngine.write(bb, offset); + } + } catch (IOException ioe) { + // free it in bucket allocator + bucketAllocator.freeBlock(offset); + throw ioe; + } + + realCacheSize.addAndGet(len); + return bucketEntry; + } + } + + /** + * Only used in test + * @throws InterruptedException + */ + void stopWriterThreads() throws InterruptedException { + for (WriterThread writerThread : writerThreads) { + writerThread.disableWriter(); + writerThread.interrupt(); + writerThread.join(); + } + } + +}