### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (revision 1424296) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (working copy) @@ -663,6 +663,11 @@ @Override public void serialize(ByteBuffer destination) { } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (working copy) @@ -171,6 +171,22 @@ windowIndex = (windowIndex + 1) % numPeriodsInWindow; } + public long getSumHitCountsPastNPeriods() { + return sum(hitCounts); + } + + public long getSumRequestCountsPastNPeriods() { + return sum(requestCounts); + } + + public long getSumHitCachingCountsPastNPeriods() { + return sum(hitCachingCounts); + } + + public long getSumRequestCachingCountsPastNPeriods() { + return sum(requestCachingCounts); + } + public double getHitRatioPastNPeriods() { double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts)); return Double.isNaN(ratio) ? 0 : ratio; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (revision 1424296) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (working copy) @@ -242,6 +242,29 @@ private static class ByteArrayCacheable implements Cacheable { + static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { + + @Override + public Cacheable deserialize(ByteBuffer b) throws IOException { + int len = b.getInt(); + Thread.yield(); + byte buf[] = new byte[len]; + b.get(buf); + return new ByteArrayCacheable(buf); + } + + @Override + public int getDeserialiserIdentifier() { + return 0; + } + + @Override + public Cacheable deserialize(ByteBuffer b, boolean reuse) + throws IOException { + return deserialize(b); + } + }; + final byte[] buf; public ByteArrayCacheable(byte[] buf) { @@ -268,20 +291,20 @@ @Override public CacheableDeserializer getDeserializer() { - return new CacheableDeserializer() { + return blockDeserializer; + } - @Override - public Cacheable deserialize(ByteBuffer b) throws IOException { - int len = b.getInt(); - Thread.yield(); - byte buf[] = new byte[len]; - b.get(buf); - return new ByteArrayCacheable(buf); - } - }; + static { /* So we can get the deserializer reference back later on */ + CacheableDeserializerFactory.registerDeserializer(blockDeserializer, 0); } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } } + private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java (revision 0) @@ -0,0 +1,56 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.io.hfile.CacheStats; + +/** + * Class that implements cache metrics for bucket cache. + */ +public class BucketCacheStats extends CacheStats { + private final AtomicLong ioHitCount = new AtomicLong(0); + private final AtomicLong ioHitTime = new AtomicLong(0); + private final static int nanoTime = 1000000; + private long lastLogTime = System.currentTimeMillis(); + + public void ioHit(long time) { + ioHitCount.incrementAndGet(); + ioHitTime.addAndGet(time); + } + + public long getIOHitPerSecond() { + long now = System.currentTimeMillis(); + long took = (now - lastLogTime) / 1000; + lastLogTime = now; + return ioHitCount.get() / took; + } + + public double getAvgIOTime() { + long time = ioHitTime.get() / nanoTime; + long count = ioHitCount.get(); + return ((float) time / (float) count); + } + + public void reset() { + ioHitCount.set(0); + ioHitTime.set(0); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java (revision 0) @@ -0,0 +1,83 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * IO engine that stores data on the memory using an array of byte buffer + * {@link ByteBufferArray} + */ +public class ByteBufferIOEngine implements IOEngine { + + private ByteBufferArray bufferArray; + + public ByteBufferIOEngine(long capacitySize, boolean direct) + throws IOException { + bufferArray = new ByteBufferArray(capacitySize, direct); + } + + /** + * Memory IO engine is always not able to support persistent storage for the + * cache + * @return false + */ + public boolean isPersistent() { + return false; + } + + /** + * Transfers data from the buffer array to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the buffer array of the first byte to be read + * @throws IOException + */ + public void read(ByteBuffer dstBuffer, long offset) throws IOException { + assert dstBuffer.hasArray(); + bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), + dstBuffer.arrayOffset()); + } + + /** + * Transfers data from the given byte buffer to the buffer array + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the buffer array of the first byte to be written + * @throws IOException + */ + public void write(ByteBuffer srcBuffer, long offset) throws IOException { + assert srcBuffer.hasArray(); + bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), + srcBuffer.arrayOffset()); + } + + /** + * No operation for the sync in the memory IO engine + */ + public void sync() { + + } + + /** + * No operation for the shutdown in the memory IO engine + */ + public void shutdown() { + + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (revision 0) @@ -0,0 +1,30 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +/** + * Thrown by {@link BucketAllocator} + */ +public class BucketAllocatorException extends Exception { + private static final long serialVersionUID = 2479119906660788096L; + + BucketAllocatorException(String reason) { + super(reason); + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java (revision 0) @@ -0,0 +1,61 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A class implements IOEngine interface could support data services be for + * {@link BucketCache}. + */ +public interface IOEngine { + + /** + * @return true if support persistent storage for the cache when shutdown + */ + boolean isPersistent(); + + /** + * Transfers data from IOEngine to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the IO engine of the first byte to be read + * @throws IOException + */ + void read(ByteBuffer dstBuffer, long offset) throws IOException; + + /** + * Transfers data from the given byte buffer to IOEngine + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the IO engine of the first byte to be written + * @throws IOException + */ + void write(ByteBuffer srcBuffer, long offset) throws IOException; + + /** + * Sync the data to IOEngine after writing + * @throws IOException + */ + void sync() throws IOException; + + /** + * Shutdown the IOEngine + */ + void shutdown(); +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (revision 1424296) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (working copy) @@ -134,6 +134,11 @@ // TODO Auto-generated method stub return null; } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } }, accessTime, false); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferArray.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferArray.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferArray.java (revision 0) @@ -0,0 +1,105 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.bucket.ByteBufferArray; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +@Category(SmallTests.class) +public class TestByteBufferArray { + + @Test + public void testDirectArray() { + int size = 32 * 1024 * 1024; // 32 MB + ByteBufferArray bba = new ByteBufferArray(size, false); + for (int i = 0; i < 10; ++i) { + if (i % 5 == 0) { // single get at start + int len = 1; + byte val = (byte) (Math.random() * 255); + byte[] arr = new byte[len]; + for (int j = 0; j < arr.length; ++j) { + arr[j] = val; + } + byte[] arr2 = new byte[len]; + bba.putMultiple(0, len, arr); + bba.getMultiple(0, len, arr2); + for (int j = 0; j < arr.length; ++j) { + assertTrue(arr[j] == arr2[j]); + } + } else if (i % 5 == 1) { // single get at end + int len = 1; + byte val = (byte) (Math.random() * 255); + byte[] arr = new byte[len]; + for (int j = 0; j < arr.length; ++j) { + arr[j] = val; + } + byte[] arr2 = new byte[len]; + bba.putMultiple(size - len, len, arr); + bba.getMultiple(size - len, len, arr2); + for (int j = 0; j < arr.length; ++j) { + assertTrue(arr[j] == arr2[j]); + } + } else if (i % 5 == 2) { // multi get at start + int len = (int) (Math.random() * size); + byte val = (byte) (Math.random() * 255); + byte[] arr = new byte[len]; + for (int j = 0; j < arr.length; ++j) { + arr[j] = val; + } + byte[] arr2 = new byte[len]; + bba.putMultiple(0, len, arr); + bba.getMultiple(0, len, arr2); + for (int j = 0; j < arr.length; ++j) { + assertTrue(arr[j] == arr2[j]); + } + } else if (i % 5 == 3) { // multi get at end + int len = (int) (Math.random() * size); + byte val = (byte) (Math.random() * 255); + byte[] arr = new byte[len]; + for (int j = 0; j < arr.length; ++j) { + arr[j] = val; + } + byte[] arr2 = new byte[len]; + bba.putMultiple(size - len, len, arr); + bba.getMultiple(size - len, len, arr2); + for (int j = 0; j < arr.length; ++j) { + assertTrue(arr[j] == arr2[j]); + } + } else if (i % 5 == 4) { // random multi get - this one breaks stuff + int len = (int) (Math.random() * size); + int start = (int) (Math.random() * (size - len)); + byte val = (byte) (Math.random() * 255); + byte[] arr = new byte[len]; + for (int j = 0; j < arr.length; ++j) { + arr[j] = val; + } + byte[] arr2 = new byte[len]; + bba.putMultiple(start, len, arr); + bba.getMultiple(start, len, arr2); + for (int j = 0; j < arr.length; ++j) { + assertTrue(arr[j] == arr2[j]); + } + } + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (working copy) @@ -56,4 +56,9 @@ */ public CacheableDeserializer getDeserializer(); + /** + * @return the block type of this cached HFile block + */ + public BlockType getBlockType(); + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java (revision 0) @@ -0,0 +1,49 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +/** + * Thrown by {@link BucketAllocator#allocateBlock(int)} when cache is full for + * the requested size + */ +public class CacheFullException extends Exception { + private static final long serialVersionUID = 3265127301824638920L; + private int requestedSize, bucketIndex; + + CacheFullException(int requestedSize, int bucketIndex) { + super(); + this.requestedSize = requestedSize; + this.bucketIndex = bucketIndex; + } + + public int bucketIndex() { + return bucketIndex; + } + + public int requestedSize() { + return requestedSize; + } + + public String info() { + StringBuilder sb = new StringBuilder(1024); + sb.append("Allocator requested size ").append(requestedSize); + sb.append(" for bucket ").append(bucketIndex); + return sb.toString(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (working copy) @@ -17,6 +17,12 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -27,7 +33,7 @@ * Cache Key for use with implementations of {@link BlockCache} */ @InterfaceAudience.Private -public class BlockCacheKey implements HeapSize { +public class BlockCacheKey implements HeapSize, java.io.Serializable { private final String hfileName; private final long offset; private final DataBlockEncoding encoding; @@ -83,7 +89,8 @@ @Override public long heapSize() { return ClassSize.align(ClassSize.OBJECT + 2 * hfileName.length() + - Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE); + Bytes.SIZEOF_LONG + 3 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + ClassSize.ARRAY); } // can't avoid this unfortunately @@ -97,4 +104,80 @@ public DataBlockEncoding getDataBlockEncoding() { return encoding; } + +//////////////////////////////////////////////////////////////////////////////// + // Stuff for high performance repeated getting of hashes and digests + // This can all go away if we mod this thing to store the underlying binary data + // behind the filename instead... + //////////////////////////////////////////////////////////////////////////////// + static final Log LOG = LogFactory.getLog(BlockCacheKey.class); + static final int DIGEST_CACHE_SIZE=512; + static private ArrayBlockingQueueMDigests=new + ArrayBlockingQueue(DIGEST_CACHE_SIZE); + private byte[]mBytes; + private int mHashCode=-1; + private void reduce() { + java.security.MessageDigest md=null; + byte[]buffer=new byte[hfileName.length()+10]; + int i=0; + for(;i>8); + buffer[i++]=(byte)(offset>>16); + buffer[i++]=(byte)(offset>>24); + buffer[i++]=(byte)(offset>>32); + buffer[i++]=(byte)(offset>>40); + buffer[i++]=(byte)(offset>>48); + buffer[i++]=(byte)(offset>>56); + buffer[i++]=(byte)encoding.getId(); + buffer[i++]=(byte)((encoding.getId()>>8)); + try { + md=getDigest(); + mBytes=md.digest(buffer); // also resets the digest per javadoc + mHashCode=mBytes[31]; + int mul=1; + for(i=0;i<32;++i,mul*=31) + mHashCode+=(mBytes[i]+30)*mul; + } finally { + if(md!=null) + releaseDigest(md); + } + } + + public int fcHashCode() { + if(mHashCode==-1) + reduce(); + return mHashCode&0x7FFFFFFF; + } + + public int fcHashCode(int modulo) { + return fcHashCode()%modulo; + } + + public static final int fcKeyBytesLength() { + MessageDigest dg=null; + try { + dg=getDigest(); + return dg.getDigestLength(); + } finally { + releaseDigest(dg); + } + } + + static private MessageDigest getDigest() { + MessageDigest dg = MDigests.poll(); + if (dg == null) { + try { + dg = java.security.MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException ex) { + LOG.fatal("Can't find necessary message digest... Should never happen"); + } + } + return dg; + } + + static private boolean releaseDigest(MessageDigest dg) { + return MDigests.offer(dg); + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (revision 0) @@ -0,0 +1,152 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestBucketCache { + static final Log LOG = LogFactory.getLog(TestBucketCache.class); + BucketCache cache; + final int CACHE_SIZE = 1000000; + final int NUM_BLOCKS = 100; + final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; + final int NUM_THREADS = 100; + final int NUM_QUERIES = 10000; + + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + String ioEngineName = "heap:/"; + String persistencePath = null; + + public class MockedBucketCache extends BucketCache { + + public MockedBucketCache(String ioEngineName, long capacity, + int writerThreads, + int writerQLen, String persistencePath) throws FileNotFoundException, + IOException { + super(ioEngineName, capacity, writerThreads, writerQLen, persistencePath); + super.WAIT_WHEN_CACHE = true; + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, + boolean inMemory) { + if (super.getBlock(cacheKey, true, false) != null) { + throw new RuntimeException("Cached an already cached block"); + } + super.cacheBlock(cacheKey, buf, inMemory); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + if (super.getBlock(cacheKey, true, false) != null) { + throw new RuntimeException("Cached an already cached block"); + } + super.cacheBlock(cacheKey, buf); + } + } + + @Before + public void setup() throws FileNotFoundException, IOException { + cache = new MockedBucketCache(ioEngineName, capacitySize, writeThreads, + writerQLen, persistencePath); + } + + @After + public void tearDown() { + cache.shutdown(); + } + + @Test + public void testBucketAllocator() throws BucketAllocatorException { + BucketAllocator mAllocator = cache.getAllocator(); + /* + * Test the allocator first + */ + int[] blockSizes = new int[2]; + blockSizes[0] = 4 * 1024; + blockSizes[1] = 8 * 1024; + boolean full = false; + int i = 0; + ArrayList allocations = new ArrayList(); + // Fill the allocated extents + while (!full) { + try { + allocations.add(new Long(mAllocator.allocateBlock(blockSizes[i + % blockSizes.length]))); + ++i; + } catch (CacheFullException cfe) { + full = true; + } + } + + for (i = 0; i < blockSizes.length; i++) { + BucketSizeInfo bucketSizeInfo = mAllocator + .roundUpToBucketSizeInfo(blockSizes[0]); + IndexStatistics indexStatistics = bucketSizeInfo.statistics(); + assertTrue(indexStatistics.freeCount() == 0); + } + + for (long offset : allocations) { + assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator + .freeBlock(offset)); + } + assertTrue(mAllocator.getUsedSize() == 0); + } + + @Test + public void testCacheSimple() throws Exception { + CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); + } + + @Test + public void testCacheMultiThreadedSingleKey() throws Exception { + CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); + } + + @Test + public void testHeapSizeChanges() { + for (WriterThread writerThread : cache.writerThreads) { + writerThread.disableWriter(); + } + CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java (revision 0) @@ -0,0 +1,62 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.bucket.FileIOEngine; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestFileIOEngine { + @Test + public void testFlashIOEngine() throws IOException { + int size = 2 * 1024 * 1024; // 2 MB + String filePath = "testFlashIOEngine"; + try { + FileIOEngine fileIOEngine = new FileIOEngine(filePath, size); + for (int i = 0; i < 50; i++) { + int len = (int) Math.floor(Math.random() * 100); + long offset = (long) Math.floor(Math.random() * size % (size - len)); + byte[] data1 = new byte[len]; + for (int j = 0; j < data1.length; ++j) { + data1[j] = (byte) (Math.random() * 255); + } + byte[] data2 = new byte[len]; + fileIOEngine.write(ByteBuffer.wrap(data1), offset); + fileIOEngine.read(ByteBuffer.wrap(data2), offset); + for (int j = 0; j < data1.length; ++j) { + assertTrue(data1[j] == data2[j]); + } + } + } finally { + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + } + + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerFactory.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerFactory.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerFactory.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.HashMap; +import java.util.Map; + +public class CacheableDeserializerFactory { + static private Map> factories = new HashMap>(); + + public static void registerDeserializer(CacheableDeserializer cd, + int idx) { + synchronized (factories) { + factories.put(idx, cd); + } + } + + public static CacheableDeserializer getDeserializer(int idx) { + return factories.get(idx); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; @@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.io.hfile.bucket.CombinedBlockCache; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.DirectMemoryUtils; import org.apache.hadoop.util.StringUtils; @@ -72,6 +75,24 @@ public static final String EVICT_BLOCKS_ON_CLOSE_KEY = "hbase.rs.evictblocksonclose"; + /** + * Configuration keys for Bucket cache + */ + public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine"; + public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; + public static final String BUCKET_CACHE_COMBINED_KEY = "hbase.bucketcache.combinedcache"; + public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = "hbase.bucketcache.combinedcache.percentage"; + public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; + /** + * Defaults for Bucket cache + */ + public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true; + public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3; + public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64; + public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -350,8 +371,46 @@ LOG.info("Allocating LruBlockCache with maximum size " + StringUtils.humanReadableInt(cacheSize)); if (offHeapCacheSize <= 0) { - globalBlockCache = new LruBlockCache(cacheSize, - StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf); + String bucketCacheIOEngineName = conf + .get(BUCKET_CACHE_IOENGINE_KEY, null); + float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); + long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax() + * bucketCachePercentage : bucketCachePercentage * 1024 * 1024); + + boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, + DEFAULT_BUCKET_CACHE_COMBINED); + BucketCache bucketCache = null; + if (bucketCacheIOEngineName != null && bucketCacheSize > 0) { + int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY, + DEFAULT_BUCKET_CACHE_WRITER_THREADS); + int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY, + DEFAULT_BUCKET_CACHE_WRITER_QUEUE); + String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY); + float combinedPercentage = conf.getFloat( + BUCKET_CACHE_COMBINED_PERCENTAGE_KEY, + DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE); + if (combinedWithLru) { + cacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize); + bucketCacheSize = (long) (combinedPercentage * bucketCacheSize); + } + try { + bucketCache = new BucketCache(bucketCacheIOEngineName, + bucketCacheSize, writerThreads, writerQueueLen, persistentPath); + } catch (IOException ioex) { + LOG.error("Can't instantiate bucket cache", ioex); + throw new IllegalArgumentException(ioex); + } + } + LOG.info("Allocating LruBlockCache with maximum size " + + StringUtils.humanReadableInt(cacheSize)); + LruBlockCache lbu = new LruBlockCache(cacheSize, + StoreFile.DEFAULT_BLOCKSIZE_SMALL); + lbu.setVictimCache(bucketCache); + if (bucketCache != null && combinedWithLru) { + globalBlockCache = new CombinedBlockCache(lbu, bucketCache); + } else { + globalBlockCache = lbu; + } } else { globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java (revision 0) @@ -0,0 +1,120 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + + +import java.util.Comparator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +import com.google.common.collect.MinMaxPriorityQueue; + +/** + * A memory-bound queue that will grow until an element brings total size >= + * maxSize. From then on, only entries that are sorted larger than the smallest + * current entry will be inserted/replaced. + * + *

+ * Use this when you want to find the largest elements (according to their + * ordering, not their heap size) that consume as close to the specified maxSize + * as possible. Default behavior is to grow just above rather than just below + * specified max. + */ +public class CachedEntryQueue { + + private MinMaxPriorityQueue> queue; + + private long cacheSize; + private long maxSize; + + /** + * @param maxSize the target size of elements in the queue + * @param blockSize expected average size of blocks + */ + public CachedEntryQueue(long maxSize, long blockSize) { + int initialSize = (int) (maxSize / blockSize); + if (initialSize == 0) + initialSize++; + queue = MinMaxPriorityQueue + .orderedBy(new Comparator>() { + public int compare(Entry entry1, + Entry entry2) { + return entry1.getValue().compareTo(entry2.getValue()); + } + + }).expectedSize(initialSize).create(); + cacheSize = 0; + this.maxSize = maxSize; + } + + /** + * Attempt to add the specified flash entry to this queue. + * + *

+ * If the queue is smaller than the max size, or if the specified element is + * ordered before the smallest element in the queue, the element will be added + * to the queue. Otherwise, there is no side effect of this call. + * @param cb block to try to add to the queue + */ + public void add(Map.Entry entry) { + if (cacheSize < maxSize) { + queue.add(entry); + cacheSize += entry.getValue().lengthBytes(); + } else { + BucketEntry head = queue.peek().getValue(); + if (entry.getValue().compareTo(head) > 0) { + cacheSize += entry.getValue().lengthBytes(); + cacheSize -= head.lengthBytes(); + if (cacheSize > maxSize) { + queue.poll(); + } else { + cacheSize += head.lengthBytes(); + } + queue.add(entry); + } + } + } + + /** + * @return The next element in this queue, or {@code null} if the queue is + * empty. + */ + public Map.Entry poll() { + return queue.poll(); + } + + /** + * @return The last element in this queue, or {@code null} if the queue is + * empty. + */ + public Map.Entry pollLast() { + return queue.pollLast(); + } + + /** + * Total size of all elements in this queue. + * @return size of all elements currently in queue, in bytes + */ + public long cacheSize() { + return cacheSize; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java (revision 0) @@ -0,0 +1,105 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * IO engine that stores data to a file on the file system + */ +public class FileIOEngine implements IOEngine { + static final Log LOG = LogFactory.getLog(FileIOEngine.class); + + private FileChannel fileChannel = null; + + public FileIOEngine(String filePath, long fileSize) throws IOException { + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(filePath, "rw"); + raf.setLength(fileSize); + fileChannel = raf.getChannel(); + LOG.info("FlashIO: Allocating " + StringUtils.byteDesc(fileSize) + + ", on the path:" + filePath); + } catch (java.io.FileNotFoundException fex) { + LOG.error("Can't create Flash cache file; flash cache disabled: " + + filePath, fex); + throw fex; + } catch (IOException ioex) { + LOG.error("Can't extend Flash cache file; insufficient space for " + + fileSize + " bytes?", ioex); + if (raf != null) + raf.close(); + throw ioex; + } + } + + /** + * File IO engine is always able to support persistent storage for the cache + * @return true + */ + public boolean isPersistent() { + return true; + } + + /** + * Transfers data from file to the given byte buffer + * @param dstBuffer the given byte buffer into which bytes are to be written + * @param offset The offset in the file of the first byte to be read + * @throws IOException + */ + public void read(ByteBuffer dstBuffer, long offset) throws IOException { + fileChannel.read(dstBuffer, offset); + } + + /** + * Transfers data from the given byte buffer to file + * @param srcBuffer the given byte buffer from which bytes are to be read + * @param offset The offset in the file of the first byte to be written + * @throws IOException + */ + public void write(ByteBuffer bb, long offset) throws IOException { + fileChannel.write(bb, offset); + } + + /** + * Sync the data to file after writing + * @throws IOException + */ + public void sync() throws IOException { + fileChannel.force(true); + } + + /** + * Close the file + */ + public void shutdown() { + try { + fileChannel.close(); + } catch (IOException ex) { + LOG.error("Can't shutdown cleanly", ex); + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy) @@ -44,6 +44,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -173,6 +175,9 @@ /** Overhead of the structure itself */ private long overhead; + /** Where to send victims (blocks evicted from the cache) */ + private BucketCache victimHandler = null; + /** * Default constructor. Specify maximum size and expected average block * size (approximation is fine). @@ -342,6 +347,8 @@ CachedBlock cb = map.get(cacheKey); if(cb == null) { if (!repeat) stats.miss(caching); + if (victimHandler != null) + return victimHandler.getBlock(cacheKey, caching, repeat); return null; } stats.hit(caching); @@ -349,12 +356,21 @@ return cb.getBuffer(); } + /** + * Whether the cache contains block with specified cacheKey + * @param cacheKey + * @return + */ + public boolean containBlock(BlockCacheKey cacheKey) { + return map.containsKey(cacheKey); + } + @Override public boolean evictBlock(BlockCacheKey cacheKey) { CachedBlock cb = map.get(cacheKey); if (cb == null) return false; - evictBlock(cb); + evictBlock(cb, false); return true; } @@ -377,14 +393,23 @@ ++numEvicted; } } + if (victimHandler != null) { + numEvicted += victimHandler.evictBlocksByHfileName(hfileName); + } return numEvicted; } - protected long evictBlock(CachedBlock block) { + protected long evictBlock(CachedBlock block, boolean cachedByVictimHandler) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); elements.decrementAndGet(); stats.evicted(); + if (cachedByVictimHandler && victimHandler != null) { + boolean wait = getCurrentSize() < acceptableSize(); + boolean inMemory = block.getPriority() == BlockPriority.MEMORY; + victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), + inMemory, wait); + } return block.heapSize(); } @@ -512,7 +537,7 @@ CachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { - freedBytes += evictBlock(cb); + freedBytes += evictBlock(cb, true); if (freedBytes >= toFree) { return freedBytes; } @@ -762,6 +787,8 @@ } public void shutdown() { + if (victimHandler != null) + victimHandler.shutdown(); this.scheduleThreadPool.shutdown(); for (int i = 0; i < 10; i++) { if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10); @@ -812,4 +839,9 @@ return counts; } + public void setVictimCache(BucketCache handler) { + assert victimHandler == null; + victimHandler = handler; + } + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (working copy) @@ -34,4 +34,20 @@ * @return T the deserialized object. */ public T deserialize(ByteBuffer b) throws IOException; + + /** + * + * @param b + * @param reuse true if Cacheable object can use the given buffer as its + * content + * @return + * @throws IOException + */ + public T deserialize(ByteBuffer b, boolean reuse) throws IOException; + + /** + * Get the identifier of this deserialiser + * @return + */ + public int getDeserialiserIdentifier(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (revision 0) @@ -0,0 +1,527 @@ +/** + * Copyright 2012 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +/** + * This class is used to generate offset when allocating a block, and free the + * block when evicting. It manages an array of {@link Bucket}, each bucket is + * specified a size and caches elements up to this size. For completely empty + * bucket, this size could be re-specified dynamically. + */ +public final class BucketAllocator { + static final Log LOG = LogFactory.getLog(BucketCache.class); + + final private static class Bucket { + private long mBaseOffset; + private int mSize, mSizeIndex; + private int mItemCount; + private int mFreeList[]; + private int mFreeCount, mUsedCount; + + public Bucket(long offset) { + mBaseOffset = offset; + mSizeIndex = -1; + } + + void reconfigure(int sizeIndex) { + mSizeIndex = sizeIndex; + mSize = BUCKET_SIZES[mSizeIndex]; + mItemCount = (int) (((long) BUCKET_SIZE) / (long) mSize); + mFreeCount = mItemCount; + mUsedCount = 0; + mFreeList = new int[mItemCount]; + for (int i = 0; i < mFreeCount; ++i) + mFreeList[i] = i; + } + + public boolean isUninstantiated() { + return mSizeIndex == -1; + } + + public int sizeIndex() { + return mSizeIndex; + } + + public int size() { + return mSize; + } + + public boolean isFree() { + return mFreeCount > 0; + } + + public boolean isCompletelyFree() { + return mUsedCount == 0; + } + + public int freeCount() { + return mFreeCount; + } + + public int usedCount() { + return mUsedCount; + } + + public int freeBytes() { + return mFreeCount * mSize; + } + + public int usedBytes() { + return mUsedCount * mSize; + } + + public long baseOffset() { + return mBaseOffset; + } + + public long allocate() { + assert mFreeCount > 0; // Else should not have been called + assert mSizeIndex != -1; + ++mUsedCount; + long offset = mBaseOffset + (mFreeList[--mFreeCount] * mSize); + if (offset < 0) { + LOG.info("Failed allocate,mBaseOffset=" + mBaseOffset + ",mFreeCount=" + + mFreeCount + ",mFreeList=" + mFreeList[mFreeCount] + ",mSize=" + + mSize); + } + return offset; + } + + public void addAllocation(long offset) throws BucketAllocatorException { + offset -= mBaseOffset; + if (offset < 0 || offset % mSize != 0) + throw new BucketAllocatorException( + "Attempt to add allocation for bad offset: " + offset + " base=" + + mBaseOffset); + int idx = (int) (offset / mSize); + boolean copyDown = false; + for (int i = 0; i < mFreeList.length; ++i) { + if (copyDown) + mFreeList[i - 1] = mFreeList[i]; + else if (mFreeList[i] == idx) + copyDown = true; + } + if (!copyDown) + throw new BucketAllocatorException("Couldn't find match for index " + + idx + " in free list"); + ++mUsedCount; + --mFreeCount; + } + + private void free(long offset) { + offset -= mBaseOffset; + assert offset >= 0; + assert offset < mItemCount * mSize; + assert offset % mSize == 0; + assert mUsedCount > 0; + assert mFreeCount < mItemCount; // Else duplicate free + int item = (int) (offset / (long) mSize); + assert !freeListContains(item); + --mUsedCount; + mFreeList[mFreeCount++] = item; + } + + private boolean freeListContains(int blockNo) { + for (int i = 0; i < mFreeCount; ++i) + assert mFreeList[i] != blockNo; + return false; + } + } + + final class BucketSizeInfo { + private List mBuckets, mFreeBuckets, mCompletelyFreeBuckets; + private int mSizeIndex; + + BucketSizeInfo(int sizeIndex) { + mBuckets = new ArrayList(); + mFreeBuckets = new ArrayList(); + mCompletelyFreeBuckets = new ArrayList(); + mSizeIndex = sizeIndex; + } + + public void instantiateBucket(Bucket b) { + assert b.isUninstantiated() || b.isCompletelyFree(); + b.reconfigure(mSizeIndex); + mBuckets.add(b); + mFreeBuckets.add(b); + mCompletelyFreeBuckets.add(b); + } + + public int sizeIndex() { + return mSizeIndex; + } + + public long allocateBlock() { + Bucket b = null; + if (mFreeBuckets.size() > 0) // Use up an existing one first... + b = mFreeBuckets.get(mFreeBuckets.size() - 1); + if (b == null) { + b = grabGlobalCompletelyFreeBucket(); + if (b != null) + instantiateBucket(b); + } + if (b == null) + return -1; + long result = b.allocate(); + blockAllocated(b); + return result; + } + + void blockAllocated(Bucket b) { + if (!b.isCompletelyFree()) + mCompletelyFreeBuckets.remove(b); + if (!b.isFree()) + mFreeBuckets.remove(b); + } + + public Bucket findAndRemoveCompletelyFreeBucket() { + Bucket b = null; + assert mBuckets.size() > 0; + if (mBuckets.size() == 1) // So we never get complete starvation of a + // bucket for a size + return null; + if (mCompletelyFreeBuckets.size() > 0) { + b = mCompletelyFreeBuckets.get(0); + removeBucket(b); + } + return b; + } + + private void removeBucket(Bucket b) { + assert b.isCompletelyFree(); + mBuckets.remove(b); + mFreeBuckets.remove(b); + mCompletelyFreeBuckets.remove(b); + } + + public void freeBlock(Bucket b, long offset) { + assert mBuckets.contains(b); + assert (!mCompletelyFreeBuckets.contains(b)); // else we shouldn't have + // anything to free... + b.free(offset); + if (!mFreeBuckets.contains(b)) + mFreeBuckets.add(b); + if (b.isCompletelyFree()) + mCompletelyFreeBuckets.add(b); + } + + public IndexStatistics statistics() { + long free = 0, used = 0; + for (Bucket b : mBuckets) { + free += b.freeCount(); + used += b.usedCount(); + } + return new IndexStatistics(free, used, BUCKET_SIZES[mSizeIndex]); + } + } + + private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, + 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, + 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, + 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, + 512 * 1024 + 1024 }; + private Integer BUCKET_SIZE_INTEGERS[]; + + public BucketSizeInfo roundUpToBucketSizeInfo(int osz) { + for (int i = 0; i < BUCKET_SIZES.length; ++i) + if (osz <= BUCKET_SIZES[i]) + return mBucketsBySize[i]; + return null; + } + + public int roundUpToBucketSize(int osz) { + for (int i = 0; i < BUCKET_SIZES.length; ++i) + if (osz <= BUCKET_SIZES[i]) + return BUCKET_SIZES[i]; + return osz; + } + + static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 1MB plus overhead + static public final int LEAST_ITEMS_IN_BUCKET = 4; + static final long BUCKET_SIZE = LEAST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE; + + private Bucket mBuckets[]; + private BucketSizeInfo[] mBucketsBySize; + private final long totalSize; + private long usedSize = 0; + + BucketAllocator(long availableSpace) throws BucketAllocatorException { + mBuckets = new Bucket[(int) (availableSpace / (long) BUCKET_SIZE)]; + if (mBuckets.length < BUCKET_SIZES.length) + throw new BucketAllocatorException( + "Bucket allocator size too small - must have room for at least " + + BUCKET_SIZES.length + " buckets"); + mBucketsBySize = new BucketSizeInfo[BUCKET_SIZES.length]; + BUCKET_SIZE_INTEGERS = new Integer[BUCKET_SIZES.length]; + for (int i = 0; i < BUCKET_SIZES.length; ++i) { + BUCKET_SIZE_INTEGERS[i] = Integer.valueOf(BUCKET_SIZES[i]); + BucketSizeInfo bsi = new BucketSizeInfo(i); + mBucketsBySize[i] = bsi; + } + for (int i = 0; i < mBuckets.length; ++i) { + mBuckets[i] = new Bucket(BUCKET_SIZE * i); + mBucketsBySize[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1] + .instantiateBucket(mBuckets[i]); + } + this.totalSize = ((long) mBuckets.length) * BUCKET_SIZE; + } + + /* + * Rebuild the allocator's data structures from a persisted map. + */ + BucketAllocator(long availableSpace, BucketCache fc, + Map map, AtomicLong realCacheSize) + throws BucketAllocatorException { + this(availableSpace); + + // each bucket has an offset, sizeindex. probably the buckets are too big + // in our default state. so what we do is reconfigure them according to what + // we've found. we can only reconfigure each bucket once; if more than once, + // we know there's a bug, so we just log the info, throw, and start again... + boolean[] reconfigured = new boolean[mBuckets.length]; + for (Map.Entry entry : map.entrySet()) { + long foundOff = entry.getValue().offset(); + int foundLen = entry.getValue().lengthBytes(); + int needBucketSizeIndex = -1; + for (int i = 0; i < BUCKET_SIZES.length; ++i) { + if (foundLen <= BUCKET_SIZES[i]) { + needBucketSizeIndex = i; + break; + } + } + if (needBucketSizeIndex == -1) + throw new BucketAllocatorException("Can't match bucket size " + foundLen + + "; clearing allocator"); + int foundBucketNo = (int) (foundOff / (long) BUCKET_SIZE); + if (foundBucketNo < 0 || foundBucketNo >= mBuckets.length) + throw new BucketAllocatorException("Can't find bucket " + foundBucketNo + + "; did you shrink the cache? Clearing allocator"); + Bucket b = mBuckets[foundBucketNo]; + if (reconfigured[foundBucketNo] == true) { + if (b.sizeIndex() != needBucketSizeIndex) + throw new BucketAllocatorException( + "Inconsistent allocation to bucket map; clearing allocator"); + } else { + if (!b.isCompletelyFree()) + throw new BucketAllocatorException("Reconfiguring bucket " + + foundBucketNo + " but it's already allocated; corrupt data"); + // Need to remove the bucket from whichever list it's currently in at + // the moment... + BucketSizeInfo bsi = mBucketsBySize[needBucketSizeIndex]; + BucketSizeInfo oldbsi = mBucketsBySize[b.sizeIndex()]; + oldbsi.removeBucket(b); + bsi.instantiateBucket(b); + reconfigured[foundBucketNo] = true; + } + realCacheSize.addAndGet(foundLen); + mBuckets[foundBucketNo].addAllocation(foundOff); + usedSize += mBuckets[foundBucketNo].size(); + mBucketsBySize[needBucketSizeIndex].blockAllocated(b); + } + } + + public String getInfo() { + StringBuilder sb = new StringBuilder(1024); + for (int i = 0; i < mBuckets.length; ++i) { + Bucket b = mBuckets[i]; + sb.append(" Bucket ").append(i).append(": ").append(b.size()); + sb.append(" freeCount=").append(b.freeCount()).append(" used=") + .append(b.usedCount()); + sb.append('\n'); + } + return sb.toString(); + } + + public long getUsedSize() { + return this.usedSize; + } + + public long getFreeSize() { + long freeSize = this.totalSize - getUsedSize(); + return freeSize; + } + + public long getTotalSize() { + return this.totalSize; + } + + /** + * Allocate a block with specified size. Return the offset + */ + public synchronized long allocateBlock(int osz) throws CacheFullException, BucketAllocatorException { + assert osz > 0; + BucketSizeInfo bsi = roundUpToBucketSizeInfo(osz); + if (bsi == null) { + throw new BucketAllocatorException("Allocation too big size=" + osz); + } + long offset = bsi.allocateBlock(); + + // Ask caller to free up space and try again! + if (offset == -1) + throw new CacheFullException(osz, bsi.sizeIndex()); + usedSize += BUCKET_SIZES[bsi.sizeIndex()]; + return offset; + } + + private Bucket grabGlobalCompletelyFreeBucket() { + for (BucketSizeInfo bsi : mBucketsBySize) { + Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); + if (b != null) + return b; + } + return null; + } + + public synchronized int freeBlock(long freeBlock) { + int bucketNo = (int) (freeBlock / (long) BUCKET_SIZE); + assert bucketNo >= 0 && bucketNo < mBuckets.length; + Bucket targetBucket = mBuckets[bucketNo]; + mBucketsBySize[targetBucket.sizeIndex()].freeBlock(targetBucket, freeBlock); + usedSize -= targetBucket.size(); + return targetBucket.size(); + } + + public int sizeIndexOfAllocation(long offset) { + int bucketNo = (int) (offset / (long) BUCKET_SIZE); + assert bucketNo >= 0 && bucketNo < mBuckets.length; + Bucket targetBucket = mBuckets[bucketNo]; + return targetBucket.sizeIndex(); + } + + public int sizeOfAllocation(long offset) { + int bucketNo = (int) (offset / (long) BUCKET_SIZE); + assert bucketNo >= 0 && bucketNo < mBuckets.length; + Bucket targetBucket = mBuckets[bucketNo]; + return targetBucket.size(); + } + + static public int getMaximumAllocationIndex() { + return BUCKET_SIZES.length; + } + + public static class IndexStatistics { + private long mFree, mUsed, mItemSize, mTotal; + + public long freeCount() { + return mFree; + } + + public long usedCount() { + return mUsed; + } + + public long totalCount() { + return mTotal; + } + + public long freeBytes() { + return mFree * mItemSize; + } + + public long usedBytes() { + return mUsed * mItemSize; + } + + public long totalBytes() { + return mTotal * mItemSize; + } + + public long itemSize() { + return mItemSize; + } + + public IndexStatistics(long free, long used, long itemSize) { + setTo(free, used, itemSize); + } + + public IndexStatistics() { + setTo(-1, -1, 0); + } + + public void setTo(long free, long used, long itemSize) { + mItemSize = itemSize; + mFree = free; + mUsed = used; + mTotal = free + used; + } + } + + public void dumpToLog() { + logStatistics(); + StringBuilder sb = new StringBuilder(); + for (Bucket b : mBuckets) { + sb.append("Bucket:").append(b.mBaseOffset).append('\n'); + sb.append(" Size index: " + b.sizeIndex() + "; Free:" + b.mFreeCount + + "; used:" + b.mUsedCount + "; freelist\n"); + for (int i = 0; i < b.freeCount(); ++i) + sb.append(b.mFreeList[i]).append(','); + } + LOG.info(sb); + } + + public void logStatistics() { + IndexStatistics total = new IndexStatistics(); + IndexStatistics[] stats = getIndexStatistics(total); + LOG.info("Bucket allocator statistics follow:\n"); + LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" + + total.usedBytes() + "; total bytes=" + total.totalBytes()); + for (IndexStatistics s : stats) { + LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() + + "; free=" + s.freeCount() + "; total=" + s.totalCount()); + } + } + + public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { + IndexStatistics[] stats = getIndexStatistics(); + long totalfree = 0, totalused = 0; + for (IndexStatistics stat : stats) { + totalfree += stat.freeBytes(); + totalused += stat.usedBytes(); + } + grandTotal.setTo(totalfree, totalused, 1); + return stats; + } + + public IndexStatistics[] getIndexStatistics() { + IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length]; + for (int i = 0; i < stats.length; ++i) + stats[i] = mBucketsBySize[i].statistics(); + return stats; + } + + public long freeBlock(long freeList[]) { + long sz = 0; + for (int i = 0; i < freeList.length; ++i) + sz += freeBlock(freeList[i]); + return sz; + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy) @@ -129,8 +129,8 @@ public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase( ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); - static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG + - Bytes.SIZEOF_INT; + public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_INT + + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; /** * Each checksum value is an integer that can be stored in 4 bytes. @@ -139,23 +139,39 @@ private static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { - public HFileBlock deserialize(ByteBuffer buf) throws IOException{ - ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit() - - HFileBlock.EXTRA_SERIALIZATION_SPACE); - buf.limit(buf.limit() - - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); - newByteBuffer.put(buf); - HFileBlock ourBuffer = new HFileBlock(newByteBuffer, - MINOR_VERSION_NO_CHECKSUM); - + public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ + buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); + ByteBuffer newByteBuffer; + if(reuse){ + newByteBuffer = buf.slice(); + }else{ + newByteBuffer = ByteBuffer.allocate(buf.limit()); + newByteBuffer.put(buf); + } buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); + int minorVersion=buf.getInt(); + HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion); ourBuffer.offset = buf.getLong(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); return ourBuffer; } + + @Override + public int getDeserialiserIdentifier() { + return 1; + } + + @Override + public HFileBlock deserialize(ByteBuffer b) throws IOException { + return deserialize(b, false); + } }; + static { /* So we can get the deserializer reference back later on */ + CacheableDeserializerFactory.registerDeserializer(blockDeserializer, 1); + } + private BlockType blockType; /** Size on disk without the header. It includes checksum data too. */ @@ -359,6 +375,17 @@ } /** + * Returns the buffer of this block, including header data. The clients must + * not modify the buffer object. This method has to be public because it is + * used in {@link BucketCache} to avoid buffer copy. + * + * @return the byte buffer with header included for read-only operations + */ + public ByteBuffer getBufferReadOnlyWithHeader() { + return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); + } + + /** * Returns a byte buffer of this block, including header data, positioned at * the beginning of header. The underlying data array is not copied. * @@ -1780,12 +1807,22 @@ @Override public void serialize(ByteBuffer destination) { - destination.put(this.buf.duplicate()); + ByteBuffer dupBuf = this.buf.duplicate(); + dupBuf.rewind(); + destination.put(dupBuf); + destination.putInt(this.minorVersion); destination.putLong(this.offset); destination.putInt(this.nextBlockOnDiskSizeWithHeader); destination.rewind(); } + public void serializeExtraInfo(ByteBuffer destination) { + destination.putInt(this.minorVersion); + destination.putLong(this.offset); + destination.putInt(this.nextBlockOnDiskSizeWithHeader); + destination.rewind(); + } + @Override public CacheableDeserializer getDeserializer() { return HFileBlock.blockDeserializer; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferArray.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferArray.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferArray.java (revision 0) @@ -0,0 +1,169 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * This class manages an array of ByteBuffers with a default size 4MB. These + * buffers are sequential and could be considered as a large buffer.It support + * reading/writing data for {@link ByteBufferIOEngine} + */ +public final class ByteBufferArray { + static final Log LOG = LogFactory.getLog(ByteBufferArray.class); + + static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; + private ByteBuffer buffers[]; + private Lock locks[]; + private int bufferSize; + private int bufferCount; + + public ByteBufferArray(long capacitySize, boolean direct) { + this.bufferSize = DEFAULT_BUFFER_SIZE; + if (this.bufferSize > (capacitySize / 16)) + this.bufferSize = (int) roundUp(capacitySize / 16, 32768); + this.bufferCount = (int) (roundUp(capacitySize, bufferSize) / bufferSize); + LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacitySize) + + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + + bufferCount); + buffers = new ByteBuffer[bufferCount + 1]; + locks = new Lock[bufferCount + 1]; + for (int i = 0; i <= bufferCount; ++i) { + locks[i] = new ReentrantLock(); + if (i < bufferCount) + buffers[i] = direct ? ByteBuffer.allocateDirect(bufferSize) + : ByteBuffer.allocate(bufferSize); + else + buffers[i] = ByteBuffer.allocate(0); + } + } + + private long roundUp(long n, long to) { + return ((n + to - 1) / to) * to; + } + + /** + * Transfers bytes from this buffer array into the given destination array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be written to the given array + * @param dstArray The array into which bytes are to be written + */ + void getMultiple(long start, int len, byte[] dstArray) { + getMultiple(start, len, dstArray, 0); + } + + /** + * Transfers bytes from this buffer array into the given destination array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be written to the given array + * @param dstArray The array into which bytes are to be written + * @param dstOffset The offset within the given array of the first byte to be + * written + */ + void getMultiple(long start, int len, byte[] dstArray, int dstOffset) { + multiple(start, len, dstArray, dstOffset, new Accessor() { + public void access(ByteBuffer bb, byte[] array, int arrayIdx, int len) { + bb.get(array, arrayIdx, len); + } + }); + } + + /** + * Transfers bytes from the given destination array into this buffer array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be read from the given array + * @param srcArray The array from which bytes are to be read + */ + void putMultiple(long start, int len, byte[] srcArray) { + putMultiple(start, len, srcArray, 0); + } + + /** + * Transfers bytes from the given destination array into this buffer array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be read from the given array + * @param srcArray The array from which bytes are to be read + * @param srcOffset The offset within the given array of the first byte to be + * read + */ + void putMultiple(long start, int len, byte[] srcArray, int srcOffset) { + multiple(start, len, srcArray, srcOffset, new Accessor() { + public void access(ByteBuffer bb, byte[] array, int arrayIdx, int len) { + bb.put(array, arrayIdx, len); + } + }); + } + + interface Accessor { + void access(ByteBuffer bb, byte[] array, int arrayIdx, int len); + } + + /** + * Access(read or write) this buffer array with a position and length as the + * given array + * @param start start offset of this buffer array + * @param len The maximum number of bytes to be access + * @param array The array from/to which bytes are to be read/written + * @param arrayOffset The offset within the given array of the first byte to + * be read or written + * @param accessor implement of how to access the bytebuffer + */ + void multiple(long start, int len, byte[] array, int arrayOffset, Accessor accessor) { + long end = start + len; + int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize); + int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize); + assert array.length >= len + arrayOffset; + if (startBuffer >= locks.length || startBuffer < 0) { + LOG.info("Failed multiple, start=" + start + ",startBuffer=" + + startBuffer + ",bufferSize=" + bufferSize); + } + int srcIndex = 0, cnt = -1; + for (int i = startBuffer; i <= endBuffer; ++i) { + Lock lock = locks[i]; + lock.lock(); + try { + ByteBuffer bb = buffers[i]; + if (i == startBuffer) { + cnt = bufferSize - startOffset; + if (cnt > len) + cnt = len; + bb.limit(startOffset + cnt).position( + startOffset ); + } else if (i == endBuffer) { + cnt = endOffset; + bb.limit(cnt).position(0); + } else { + cnt = bufferSize ; + bb.limit(cnt).position(0); + } + accessor.access(bb, array, srcIndex + arrayOffset, cnt); + srcIndex += cnt; + } finally { + lock.unlock(); + } + } + assert srcIndex == len; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java (revision 0) @@ -0,0 +1,55 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Map from a type to int and vice-versa. Used for reducing bit field item + * counts. + */ +public final class UniqueIndexMap implements Serializable { + private static final long serialVersionUID = -1145635738654002342L; + + ConcurrentHashMap mForwardMap = new ConcurrentHashMap(); + ConcurrentHashMap mReverseMap = new ConcurrentHashMap(); + AtomicInteger mIndex = new AtomicInteger(0); + + // Map a length to an index. If we can't, allocate a new mapping. We might + // race here and + // get two entries with the same deserialiser. This is fine. + int map(T parameter) { + Integer ret = mForwardMap.get(parameter); + if (ret != null) + return ret.intValue(); + int nexti = mIndex.incrementAndGet(); + assert (nexti < Short.MAX_VALUE); + mForwardMap.put(parameter, nexti); + mReverseMap.putIfAbsent(nexti, parameter); + return nexti; + } + + T unmap(int leni) { + Integer len = Integer.valueOf(leni); + assert mReverseMap.containsKey(len); + return mReverseMap.get(len); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 0) @@ -0,0 +1,1029 @@ +/** + * Copyright 2012 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerFactory; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * BucketCache uses {@link BucketAllocator} to allocate/free block, and uses a + * HashMap in order to determine whether a given element hit. It could uses + * memory {@link ByteBufferIOEngine} or file {@link FileIOEngine}to store/read + * the block data. + * + * Eviction is using similar algorithm as + * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} + * + * BucketCache could be used as a mainly block cache(see + * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS and + * fragment by GC. + * + * Also could be used as a secondary cache(e.g. using Fusionio to store block) + * to enlarge cache space by + * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} + */ +public class BucketCache implements BlockCache, HeapSize { + static final Log LOG = LogFactory.getLog(BucketCache.class); + + /** Priority buckets */ + static final float DEFAULT_SINGLE_FACTOR = 0.25f; + static final float DEFAULT_MULTI_FACTOR = 0.50f; + static final float DEFAULT_MEMORY_FACTOR = 0.25f; + static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; + + static final float DEFAULT_ACCEPT_FACTOR = 0.95f; + static final float DEFAULT_MIN_FACTOR = 0.85f; + + /** Statistics thread */ + static final int statThreadPeriod = 3 * 60; + + final static int DEFAULT_WRITER_THREADS = 3; + final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + + // Store/read block data + IOEngine ioEngine; + + // Store the block in this map before write it to cache + private ConcurrentHashMap ramCache; + private ConcurrentHashMap backingMap; + + /** + * Flag that states if the cache is enabled or not... We shut it off if + * there's an IO error, so that Bucket IO exceptions/errors don't bring down + * the HBase server. + */ + private volatile boolean cacheEnabled; + + private ArrayList> writerQueues = new ArrayList>(); + WriterThread writerThreads[]; + + + + /** Volatile boolean to track if free space is in process or not */ + private volatile boolean freeInProgress = false; + private Lock freeSpaceLock = new ReentrantLock(); + + private UniqueIndexMap deserialiserMap = new UniqueIndexMap(); + + private final AtomicLong realCacheSize = new AtomicLong(0); + private final AtomicLong heapSize = new AtomicLong(0); + private final AtomicLong blockCount = new AtomicLong(0); + private final AtomicLong failedAddBlockNum = new AtomicLong(0); + + /** Cache access count (sequential ID) */ + private final AtomicLong accessCount = new AtomicLong(0); + + private final Object[] cacheWaitSignals; + private static final int DEFAULT_CACHE_WAIT_TIME = 50; + boolean WAIT_WHEN_CACHE = false; + + private BucketCacheStats cacheStats = new BucketCacheStats(); + + private String persistencePath; + private long cacheCapacitySize; + /** Approximate block size */ + private final long blockSize; + + + + /** Statistics thread schedule pool (for heavy debugging, could remove) */ + private final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("BucketCache Statistics #%d") + .setDaemon(true) + .build()); + + // Allocate or free space for the block + private BucketAllocator bucketAllocator; + + + public BucketCache(String ioEngineName, long capacity, int writerThreadNum, + int writerQLen, String persistencePath) throws FileNotFoundException, + IOException { + if (ioEngineName.startsWith("file:")) + ioEngine = new FileIOEngine(ioEngineName.substring(5), capacity); + else if (ioEngineName.startsWith("offheap")) + ioEngine = new ByteBufferIOEngine(capacity, true); + else if (ioEngineName.startsWith("heap")) + ioEngine = new ByteBufferIOEngine(capacity, false); + else + throw new IOException( + "Don't understand io engine name for cache - prefix with file:, heap: or offHeap:"); + + this.writerThreads = new WriterThread[writerThreadNum]; + this.cacheWaitSignals = new Object[writerThreadNum]; + long hashSize = recommendedHashSize(capacity / 16384); + assert hashSize < Integer.MAX_VALUE; // Enough for about 32TB of cache! + this.cacheCapacitySize = capacity; + this.persistencePath = persistencePath; + this.blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL; + + try { + bucketAllocator = new BucketAllocator(capacity); + } catch (BucketAllocatorException fex) { + throw new IOException("File size too small"); + } + for (int i = 0; i < writerThreads.length; ++i) { + writerQueues.add(new ArrayBlockingQueue(writerQLen)); + this.cacheWaitSignals[i] = new Object(); + } + + assert writerQueues.size() == writerThreads.length; + this.ramCache = new ConcurrentHashMap(); + + this.backingMap = new ConcurrentHashMap((int) (capacity / 16384)); + + if (ioEngine.isPersistent() && persistencePath != null) { + try { + retrieveFromFile(); + } catch (IOException ioex) { + LOG.error("Bucket cache: Can't restore from file because of", ioex); + } catch (BucketAllocatorException faex) { + LOG.error("Bucket cache: Can't restore from file in rebuild because of", + faex); + } catch (ClassNotFoundException cnfe) { + LOG.error( + "Bucket cache: Can't restore from file in rebuild because can't deserialise", + cnfe); + } + } + for (int i = 0; i < writerThreads.length; ++i) { + writerThreads[i] = new WriterThread(writerQueues.get(i), i); + writerThreads[i].setName("Bucket cache writer " + i); + } + this.cacheEnabled = true; + for (int i = 0; i < writerThreads.length; ++i) { + writerThreads[i].start(); + } + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), + statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + LOG.info("Started bucket cache"); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param buf block buffer + */ + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + cacheBlock(cacheKey, buf, false); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + */ + public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { + cacheBlockWithWait(cacheKey, cachedItem, inMemory, WAIT_WHEN_CACHE); + } + + /** + * Cache the block to ramCache + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + * @param wait if true, blocking wait when queue is full + */ + public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, + boolean inMemory, boolean wait) { + if (!cacheEnabled) + return; + + if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) + return; + + /* + * Stuff the entry into the RAM cache so it can get drained to the + * persistent store + */ + RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, + accessCount.incrementAndGet(), inMemory); + ramCache.put(cacheKey, re); + int threadNO = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); + BlockingQueue bq = writerQueues.get(threadNO); + boolean successfulAdd = bq.offer(re); + if (!successfulAdd && wait) { + synchronized (cacheWaitSignals[threadNO]) { + try { + cacheWaitSignals[threadNO].wait(DEFAULT_CACHE_WAIT_TIME); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + successfulAdd = bq.offer(re); + } + if (!successfulAdd) { + ramCache.remove(cacheKey); + failedAddBlockNum.incrementAndGet(); + } else { + this.blockCount.incrementAndGet(); + this.heapSize.addAndGet(cachedItem.heapSize()); + } + } + + /** + * Get the buffer of the block with the specified name. + * @param key block's cache key + * @param caching true if the caller caches blocks on cache misses + * @param repeat Whether this is a repeat lookup for the same block + * @return buffer of specified cache key, or null if not in cache + */ + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + if (!cacheEnabled) + return null; + RAMQueueEntry re = ramCache.get(key); + if (re != null) { + cacheStats.hit(caching); + re.access(accessCount.incrementAndGet()); + return re.getData(); + } + BucketEntry bucketEntry = backingMap.get(key); + if(bucketEntry!=null) { + long start = System.nanoTime(); + try { + int len = bucketEntry.lengthBytes(); + ByteBuffer bb = ByteBuffer.allocate(len); + ioEngine.read(bb, bucketEntry.offset()); + Cacheable cachedBlock = bucketEntry.deserializerReference( + deserialiserMap).deserialize(bb, true); + long took = System.nanoTime() - start; + cacheStats.hit(caching); + cacheStats.ioHit(took); + bucketEntry.access(accessCount.incrementAndGet()); + return cachedBlock; + } catch (IOException ioex) { + LOG.error( + "IO exception reading from bucket cache; disabling... check your fs is in good health..." + + ioex.toString(), ioex); + disableCache(); + } + } + if(!repeat)cacheStats.miss(caching); + return null; + } + + public boolean evictBlock(BlockCacheKey cacheKey) { + if (!cacheEnabled) return false; + RAMQueueEntry removedBlock = ramCache.remove(cacheKey); + if (removedBlock != null) { + this.blockCount.decrementAndGet(); + this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); + } + BucketEntry bucketEntry = backingMap.remove(cacheKey); + if (bucketEntry != null) { + bucketAllocator.freeBlock(bucketEntry.offset()); + realCacheSize.addAndGet(-1 * bucketEntry.lengthBytes()); + if (removedBlock == null) { + this.blockCount.decrementAndGet(); + } + } + cacheStats.evicted(); + return true; + } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + static class StatisticsThread extends Thread { + BucketCache bucketCache; + + public StatisticsThread(BucketCache bucketCache) { + super("BucketCache.StatisticsThread"); + setDaemon(true); + this.bucketCache = bucketCache; + } + @Override + public void run() { + bucketCache.logStats(); + } + } + + public void logStats() { + if (!LOG.isDebugEnabled()) return; + // Log size + long totalSize = bucketAllocator.getTotalSize(); + long usedSize = bucketAllocator.getUsedSize(); + long freeSize = totalSize - usedSize; + long cacheSize = this.realCacheSize.get(); + LOG.debug("BucketCache Stats: " + + "failedAddBlockNum=" + this.failedAddBlockNum.get() + ", " + + "total=" + StringUtils.byteDesc(totalSize) + ", " + + "free=" + StringUtils.byteDesc(freeSize) + ", " + + "usedSize=" + StringUtils.byteDesc(usedSize) +", " + + "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + + "accesses=" + cacheStats.getRequestCount() + ", " + + "hits=" + cacheStats.getHitCount() + ", " + + "IOhitsPerSecond=" + cacheStats.getIOHitPerSecond() + ", " + + "IOavgTime=" + String.format("%.2f", cacheStats.getAvgIOTime())+ ", " + + "hitRatio=" + + (cacheStats.getHitCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + + "cachingHits=" + cacheStats.getHitCachingCount() + ", " + + "cachingHitsRatio=" + + (cacheStats.getHitCachingCount() == 0 ? "0," : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + + "evictions=" + cacheStats.getEvictionCount() + ", " + + "evicted=" + cacheStats.getEvictedCount() + ", " + + "evictedPerRun=" + cacheStats.evictedPerEviction()); + cacheStats.reset(); + } + + private long acceptableSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR); + } + + private long minSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MIN_FACTOR); + } + + private long singleSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR); + } + + private long multiSize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR + * DEFAULT_MIN_FACTOR); + } + + private long memorySize() { + return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR + * DEFAULT_MIN_FACTOR); + } + + private void freeSpace() { + // Ensure only one freeSpace progress at a time + if (!freeSpaceLock.tryLock()) return; + try { + freeInProgress = true; + long bytesToFreeForTotal = 0; + /* + * Calculate free byte for each bucketSize + */ + StringBuffer msgBuffer = new StringBuffer(); + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); + long[] bytesToFreeForBucket = new long[stats.length]; + for (int i = 0; i < stats.length; i++) { + bytesToFreeForBucket[i] = 0; + long freeGoal = (long) Math.floor(stats[i].totalCount() + * (1 - DEFAULT_MIN_FACTOR)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + bytesToFreeForBucket[i] = stats[i].itemSize() + * (freeGoal - stats[i].freeCount()); + bytesToFreeForTotal += bytesToFreeForBucket[i]; + msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); + } + } + msgBuffer.append("Free for total=" + + StringUtils.byteDesc(bytesToFreeForTotal) + ", "); + + if (bytesToFreeForTotal <= 0) { + return; + } + long currentSize = bucketAllocator.getUsedSize(); + long totalSize=bucketAllocator.getTotalSize(); + LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString() + + " of current used=" + StringUtils.byteDesc(currentSize) + + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get()) + + ",total=" + StringUtils.byteDesc(totalSize)); + + long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeForTotal + * (1 + DEFAULT_EXTRA_FREE_FACTOR)); + + // Instantiate priority buckets + EntryBucket bucketSingle = new EntryBucket(bytesToFreeWithExtra, + blockSize, singleSize()); + EntryBucket bucketMulti = new EntryBucket(bytesToFreeWithExtra, + blockSize, multiSize()); + EntryBucket bucketMemory = new EntryBucket(bytesToFreeWithExtra, + blockSize, memorySize()); + + // Scan entire map putting into appropriate buckets + for (Map.Entry bucketEntry : backingMap.entrySet()) { + switch (bucketEntry.getValue().getPriority()) { + case SINGLE: { + bucketSingle.add(bucketEntry); + break; + } + case MULTI: { + bucketMulti.add(bucketEntry); + break; + } + case MEMORY: { + bucketMemory.add(bucketEntry); + break; + } + } + } + + PriorityQueue bucketQueue = new PriorityQueue(3); + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); + + int remainingBuckets = 3; + long bytesFreed = 0; + + EntryBucket bucket; + while ((bucket = bucketQueue.poll()) != null) { + long overflow = bucket.overflow(); + if (overflow > 0) { + long bucketBytesToFree = Math.min(overflow, + (bytesToFreeForTotal - bytesFreed) / remainingBuckets); + bytesFreed += bucket.free(bucketBytesToFree); + } + remainingBuckets--; + } + + stats = bucketAllocator.getIndexStatistics(); + boolean needFreeForExtra = false; + for (int i = 0; i < stats.length; i++) { + long freeGoal = (long) Math.floor(stats[i].totalCount() + * (1 - DEFAULT_MIN_FACTOR)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + needFreeForExtra = true; + break; + } + } + + if (needFreeForExtra) { + bucketQueue.clear(); + remainingBuckets = 2; + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + + while ((bucket = bucketQueue.poll()) != null) { + long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) + / remainingBuckets; + bytesFreed += bucket.free(bucketBytesToFree); + remainingBuckets--; + } + } + + if (LOG.isDebugEnabled()) { + long single = bucketSingle.totalSize(); + long multi = bucketMulti.totalSize(); + long memory = bucketMemory.totalSize(); + LOG.debug("Bucket cache free space completed; " + "freed=" + + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + + StringUtils.byteDesc(totalSize) + ", " + "single=" + + StringUtils.byteDesc(single) + ", " + "multi=" + + StringUtils.byteDesc(multi) + ", " + "memory=" + + StringUtils.byteDesc(memory)); + } + + } finally { + cacheStats.evict(); + freeInProgress = false; + freeSpaceLock.unlock(); + } + } + + // This handles flushing the RAM cache to IOEngine. + public class WriterThread extends HasThread { + BlockingQueue inputQueue; + final int threadNO; + boolean writerEnable = true; + + WriterThread(BlockingQueue queue, int threadNO) { + super(); + this.inputQueue = queue; + this.threadNO = threadNO; + setDaemon(true); + } + + // Used for test + void disableWriter() { + this.writerEnable = false; + } + + public void run() { + List entries = new ArrayList(); + try { + while (cacheEnabled && writerEnable) { + try { + // Blocks + entries.add(inputQueue.take()); + // Add any others remaining. Does not block. + inputQueue.drainTo(entries); + synchronized (cacheWaitSignals[threadNO]) { + cacheWaitSignals[threadNO].notifyAll(); + } + } catch (InterruptedException ie) { + if (!cacheEnabled) break; + } + doDrain(entries); + } + } catch (Throwable t) { + LOG.info("Failed doing drain", t); + } + LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); + } + + private void doDrain(List entries) + throws InterruptedException { + BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; + RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; + int done = 0; + while (entries.size() > 0 && cacheEnabled) { + // Keep going in case we throw... + try { + RAMQueueEntry e = entries.remove(entries.size() - 1); + BucketEntry bucketEntry = e.writeToCache(ioEngine, bucketAllocator, + deserialiserMap, realCacheSize); + ramEntries[done] = e; + bucketEntries[done++] = bucketEntry; + } catch (BucketAllocatorException fle) { + LOG.warn("Failed allocating for block ", fle); + } catch (CacheFullException cfe) { + if (!freeInProgress) { + freeSpace(); + } else { + Thread.sleep(50); + } + } catch (IOException ioex) { + LOG.error("IO exception writing to bucket cache; disabling... check your fs is in good health...", + ioex); + disableCache(); + return; + } + } + + // Make sure that the data pages we written are on the media before we + // update the map. + try { + ioEngine.sync(); + } catch (IOException ioex) { + LOG.error("IO exception writing to bucket cache; disabling... check your fs is in good health...",ioex); + disableCache(); + return; + } + + for (int i = 0; i < done; ++i) { + if (bucketEntries[i] != null) { + backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); + } + RAMQueueEntry ramcache = ramCache.remove(ramEntries[i].getKey()); + if (ramcache != null) { + heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize()); + } + } + + if (bucketAllocator.getUsedSize() > acceptableSize()) { + freeSpace(); + } + } + } + + + + private void persistToFile() throws IOException { + assert !cacheEnabled; + FileOutputStream fos = null; + ObjectOutputStream oos = null; + try { + if (!ioEngine.isPersistent()) + throw new IOException( + "Attempt to persist non-persistent cache mappings!"); + fos = new FileOutputStream(persistencePath, false); + oos = new ObjectOutputStream(fos); + oos.writeLong(cacheCapacitySize); + oos.writeUTF(ioEngine.getClass().getName()); + oos.writeUTF(backingMap.getClass().getName()); + oos.writeObject(deserialiserMap); + oos.writeObject(backingMap); + } finally { + if (oos != null) oos.close(); + if (fos != null) fos.close(); + } + } + + @SuppressWarnings("unchecked") + private void retrieveFromFile() throws IOException, BucketAllocatorException, + ClassNotFoundException { + File persistenceFile = new File(persistencePath); + if (!persistenceFile.exists()) { + return; + } + assert !cacheEnabled; + FileInputStream fis = null; + ObjectInputStream ois = null; + try { + if (!ioEngine.isPersistent()) + throw new IOException( + "Attempt to restore non-persistent cache mappings!"); + fis = new FileInputStream(persistencePath); + ois = new ObjectInputStream(fis); + long capacitySize = ois.readLong(); + if (capacitySize != cacheCapacitySize) + throw new IOException( + "Can't restore Bucket cache because of mismatched cache size"); + String ioclass = ois.readUTF(); + String mapclass = ois.readUTF(); + if (!ioEngine.getClass().getName().equals(ioclass)) + throw new IOException("Class name for IO engine mismatch: " + ioclass); + if (!backingMap.getClass().getName().equals(mapclass)) + throw new IOException("Class name for cache map mismatch: " + mapclass); + UniqueIndexMap deserMap = (UniqueIndexMap) ois + .readObject(); + try { + backingMap = (ConcurrentHashMap) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + BucketAllocator allocator = new BucketAllocator(cacheCapacitySize, this, + backingMap, this.realCacheSize); + bucketAllocator = allocator; + deserialiserMap = deserMap; + } finally { + if (ois != null) ois.close(); + if (fis != null) fis.close(); + if (!persistenceFile.delete()) { + throw new IOException("Meta file can't be deleted"); + } + } + } + + /** + * Used to shut down the cache -or- turn it off in the case of something + * broken. + */ + private void disableCache() { + if (!cacheEnabled) + return; + cacheEnabled = false; + if (ioEngine != null) + ioEngine.shutdown(); + this.scheduleThreadPool.shutdown(); + for (int i = 0; i < writerThreads.length; ++i) + writerThreads[i].interrupt(); + } + + private void join() throws InterruptedException { + for (int i = 0; i < writerThreads.length; ++i) + writerThreads[i].join(); + } + + public void shutdown() { + disableCache(); + LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + + "; path to write=" + persistencePath); + if (ioEngine.isPersistent() && persistencePath != null) { + try { + join(); + persistToFile(); + } catch (IOException ex) { + LOG.error("Unable to persist data on exit: " + ex.toString(), ex); + } catch (InterruptedException e) { + LOG.warn("Failed to persist data on exit", e); + } + } + } + + + + public CacheStats getStats() { + return cacheStats; + } + + BucketAllocator getAllocator() { + return this.bucketAllocator; + } + + public long heapSize() { + return this.heapSize.get(); + } + + /** + * Returns the total size of the block cache, in bytes. + * @return size of cache, in bytes + */ + public long size() { + return this.realCacheSize.get(); + } + + public long getFreeSize() { + return this.bucketAllocator.getFreeSize(); + } + + public long getBlockCount() { + return this.blockCount.get(); + } + + /** + * Returns the occupied size of the block cache, in bytes. + * @return occupied space in cache, in bytes + */ + public long getCurrentSize() { + return this.bucketAllocator.getUsedSize(); + } + + public long getEvictedCount() { + return cacheStats.getEvictedCount(); + } + + /** + * Evicts all blocks for a specific HFile. This is an expensive operation + * implemented as a linear-time search through all blocks in the cache. + * Ideally this should be a search in a log-access-time map. + * + *

+ * This is used for evict-on-close to remove all blocks of a specific HFile. + * + * @return the number of blocks evicted + */ + public int evictBlocksByHfileName(String hfileName) { + int numEvicted = 0; + for (BlockCacheKey key : this.backingMap.keySet()) { + if (key.getHfileName().equals(hfileName)) { + if (evictBlock(key)) + ++numEvicted; + } + } + return numEvicted; + } + + + /* Does not need to be necessary. */ + public List getBlockCacheColumnFamilySummaries( + Configuration conf) { + throw new UnsupportedOperationException(); + } + + static enum BlockPriority { + /** + * Accessed a single time (used for scan-resistance) + */ + SINGLE, + /** + * Accessed multiple times + */ + MULTI, + /** + * Block from in-memory store + */ + MEMORY + }; + + /** + * cache item in cache. We expect this to be where most memory goes. Java uses + * 8 bytes just for object headers; after this, we want to use as little as + * possible - so we only use 8 bytes, but in order to do so end up messing + * around with all this Java casting stuff. Offset stored as 5 bytes that make + * up the long. Doubt we'll see devices this big for ages. Offsets are divided + * by 256. So 5 bytes gives us 256TB or so. + */ + static class BucketEntry implements Serializable, Comparable { + private static final long serialVersionUID = -6741504807982257534L; + private int offsetBase; + private int length; + private byte offset1; + byte deserialiserIndex; + private volatile long accessTime; + private BlockPriority priority; + + BucketEntry(long offset, int length, long accessTime, boolean inMemory) { + setOffset(offset); + this.length = length; + this.accessTime = accessTime; + if (inMemory) { + this.priority = BlockPriority.MEMORY; + } else { + this.priority = BlockPriority.SINGLE; + } + } + + long offset() { // Java has no unsigned numbers + long o = ((long) offsetBase) & 0xFFFFFFFF; + o += (((long) (offset1)) & 0xFF) << 32; + return o << 8; + } + + private void setOffset(long value) { + assert (value & 0xFF) == 0; + value >>= 8; + offsetBase = (int) value; + offset1 = (byte) (value >> 32); + } + + public int lengthBytes() { + return length; + } + + protected CacheableDeserializer deserializerReference( + UniqueIndexMap deserialiserMap) { + return CacheableDeserializerFactory.getDeserializer(deserialiserMap + .unmap(deserialiserIndex)); + } + + protected void setDeserialiserReference( + CacheableDeserializer deserializer, + UniqueIndexMap deserialiserMap) { + this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer + .getDeserialiserIdentifier())); + } + + /** + * Block has been accessed. Update its local access time. + */ + public void access(long accessTime) { + this.accessTime = accessTime; + if (this.priority == BlockPriority.SINGLE) { + this.priority = BlockPriority.MULTI; + } + } + + public BlockPriority getPriority() { + return this.priority; + } + + @Override + public int compareTo(BucketEntry that) { + if(this.accessTime == that.accessTime) return 0; + return this.accessTime < that.accessTime ? 1 : -1; + } + + @Override + public boolean equals(Object that) { + return this == that; + } + } + + /** + * Used to group bucket entries into priority buckets. There will be a + * EntryBucket for each priority (single, multi, memory). Once bucketed, the + * eviction algorithm takes the appropriate number of elements out of each + * according to configuration parameters and their relatives sizes. + */ + class EntryBucket implements Comparable { + + private CachedEntryQueue queue; + private long totalSize = 0; + private long bucketSize; + + public EntryBucket(long bytesToFree, long blockSize, long bucketSize) { + this.bucketSize = bucketSize; + queue = new CachedEntryQueue(bytesToFree, blockSize); + totalSize = 0; + } + + public void add(Map.Entry block) { + totalSize += block.getValue().lengthBytes(); + queue.add(block); + } + + public long free(long toFree) { + Map.Entry entry; + long freedBytes = 0; + while ((entry = queue.pollLast()) != null) { + evictBlock(entry.getKey()); + freedBytes += entry.getValue().lengthBytes(); + if (freedBytes >= toFree) { + return freedBytes; + } + } + return freedBytes; + } + + public long overflow() { + return totalSize - bucketSize; + } + + public long totalSize() { + return totalSize; + } + + @Override + public int compareTo(EntryBucket that) { + if (this.overflow() == that.overflow()) + return 0; + return this.overflow() > that.overflow() ? 1 : -1; + } + + @Override + public boolean equals(Object that) { + return this == that; + } + + } + + static class RAMQueueEntry { + private BlockCacheKey key; + private Cacheable data; + private long accessTime; + private boolean inMemory; + + public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime, + boolean inMemory) { + this.key = bck; + this.data = data; + this.accessTime = accessTime; + this.inMemory = inMemory; + } + + public Cacheable getData() { + return data; + } + + public BlockCacheKey getKey() { + return key; + } + + public void access(long accessTime) { + this.accessTime = accessTime; + } + + public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, + final UniqueIndexMap deserialiserMap, final AtomicLong realCacheSize) + throws CacheFullException,IOException, BucketAllocatorException { + int len = data.getSerializedLength(); + if (len == 0) // This cacheable thing can't be seralised... + return null; + long offset = bucketAllocator.allocateBlock(len); + BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, + inMemory); + bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); + if (data instanceof HFileBlock) { + ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader(); + sliceBuf.rewind(); + assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); + ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer); + ioEngine.write(sliceBuf, offset); + ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); + } else { + ByteBuffer bb = ByteBuffer.allocate(len); + data.serialize(bb); + ioEngine.write(bb, offset); + } + realCacheSize.addAndGet(len); + return bucketEntry; + } + } + + static public long recommendedHashSize(long dataPoints) { + return dataPoints * 2 + 1; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CombinedBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CombinedBlockCache.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CombinedBlockCache.java (revision 0) @@ -0,0 +1,218 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache; + +/** + * CombinedBlockCache is an abstraction layer that combines + * {@link LruBlockCache} and {@link BucketCache}, the smaller lruCache is used + * to cache bloom block and index block , the larger bucketCache is used to + * cache data block.Getblock reads first from the smaller lruCache before + * looking for the block in the bucketCache. Metrics are the combined size and + * hits and misses of both caches. + * + **/ +public class CombinedBlockCache implements BlockCache, HeapSize { + + private final LruBlockCache lruCache; + private final BucketCache bucketCache; + private final CombinedCacheStats combinedCacheStats; + + public CombinedBlockCache(LruBlockCache lbu, BucketCache bucketCache) { + this.lruCache = lbu; + this.bucketCache = bucketCache; + this.combinedCacheStats = new CombinedCacheStats(lbu.getStats(), + bucketCache.getStats()); + } + + @Override + public long heapSize() { + return lruCache.heapSize() + bucketCache.heapSize(); + } + + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { + boolean isMetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA; + if (isMetaBlock) { + lruCache.cacheBlock(cacheKey, buf, inMemory); + } else { + bucketCache.cacheBlock(cacheKey, buf, inMemory); + } + } + + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + cacheBlock(cacheKey, buf, false); + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean meta) { + if (lruCache.containBlock(cacheKey)) { + return lruCache.getBlock(cacheKey, caching, meta); + } else { + return bucketCache.getBlock(cacheKey, caching, meta); + } + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey); + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + return lruCache.evictBlocksByHfileName(hfileName) + + bucketCache.evictBlocksByHfileName(hfileName); + } + + @Override + public CacheStats getStats() { + return this.combinedCacheStats; + } + + @Override + public void shutdown() { + lruCache.shutdown(); + bucketCache.shutdown(); + + } + + @Override + public long size() { + return lruCache.size() + bucketCache.size(); + } + + @Override + public long getFreeSize() { + return lruCache.getFreeSize() + bucketCache.getFreeSize(); + } + + @Override + public long getCurrentSize() { + return lruCache.getCurrentSize() + bucketCache.getCurrentSize(); + } + + @Override + public long getEvictedCount() { + return lruCache.getEvictedCount() + bucketCache.getEvictedCount(); + } + + @Override + public long getBlockCount() { + return lruCache.getBlockCount() + bucketCache.getBlockCount(); + } + + @Override + public List getBlockCacheColumnFamilySummaries( + Configuration conf) throws IOException { + throw new UnsupportedOperationException(); + } + + public static class CombinedCacheStats extends CacheStats { + private final CacheStats lruCacheStats; + private final CacheStats bucketCacheStats; + + CombinedCacheStats(CacheStats lbcStats, CacheStats fcStats) { + this.lruCacheStats = lbcStats; + this.bucketCacheStats = fcStats; + } + + @Override + public long getRequestCount() { + return lruCacheStats.getRequestCount() + + bucketCacheStats.getRequestCount(); + } + + @Override + public long getRequestCachingCount() { + return lruCacheStats.getRequestCachingCount() + + bucketCacheStats.getRequestCachingCount(); + } + + @Override + public long getMissCount() { + return lruCacheStats.getMissCount() + bucketCacheStats.getMissCount(); + } + + @Override + public long getMissCachingCount() { + return lruCacheStats.getMissCachingCount() + + bucketCacheStats.getMissCachingCount(); + } + + @Override + public long getHitCount() { + return lruCacheStats.getHitCount() + bucketCacheStats.getHitCount(); + } + + @Override + public long getHitCachingCount() { + return lruCacheStats.getHitCachingCount() + + bucketCacheStats.getHitCachingCount(); + } + + @Override + public long getEvictionCount() { + return lruCacheStats.getEvictionCount() + + bucketCacheStats.getEvictionCount(); + } + + @Override + public long getEvictedCount() { + return lruCacheStats.getEvictedCount() + + bucketCacheStats.getEvictedCount(); + } + + @Override + public double getHitRatioPastNPeriods() { + double ratio = ((double) (lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats + .getSumHitCountsPastNPeriods()) / (double) (lruCacheStats + .getSumRequestCountsPastNPeriods() + bucketCacheStats + .getSumRequestCountsPastNPeriods())); + return Double.isNaN(ratio) ? 0 : ratio; + } + + @Override + public double getHitCachingRatioPastNPeriods() { + double ratio = ((double) (lruCacheStats + .getSumHitCachingCountsPastNPeriods() + bucketCacheStats + .getSumHitCachingCountsPastNPeriods()) / (double) (lruCacheStats + .getSumRequestCachingCountsPastNPeriods() + bucketCacheStats + .getSumRequestCachingCountsPastNPeriods())); + return Double.isNaN(ratio) ? 0 : ratio; + } + + } + +}