From 0e698bb807af138ead9cc14388228e3a0d1c563e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 4 Apr 2015 11:56:10 +0800 Subject: [PATCH] A new unit test for HBASE-13301 --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 19 ++- .../io/hfile/bucket/TestBucketCacheMemoryLeak.java | 139 +++++++++++++++++++++ 2 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheMemoryLeak.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 7dda0e6..4fd05e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -115,7 +116,8 @@ public class BucketCache implements BlockCache, HeapSize { @VisibleForTesting Map ramCache; // In this map, store the block's meta data like offset, length - private Map backingMap; + @VisibleForTesting + Map backingMap; /** * Flag if the cache is enabled or not... We shut it off if there are IO @@ -442,6 +444,10 @@ public class BucketCache implements BlockCache, HeapSize { return null; } + final MutableBoolean flag = new MutableBoolean(); + + private boolean firstTime = true; + @Override public boolean evictBlock(BlockCacheKey cacheKey) { if (!cacheEnabled) return false; @@ -459,6 +465,17 @@ public class BucketCache implements BlockCache, HeapSize { return false; } } + synchronized (flag) { + if (firstTime) { + firstTime = false; + while (flag.booleanValue()) { + try { + flag.wait(); + } catch (InterruptedException e) { + } + } + } + } IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheMemoryLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheMemoryLeak.java new file mode 100644 index 0000000..4656c96 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheMemoryLeak.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.io.Closeables; + +public class TestBucketCacheMemoryLeak { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test", + TestBucketCacheMemoryLeak.class.getName()); + + private static final byte[] FAMILY = Bytes.toBytes("F"); + + private static final byte[] QUALIFIER = Bytes.toBytes("Q"); + + private static HTable TABLE; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + TEST_UTIL.getConfiguration().setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 128.0f); + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLE_NAME.getNamespaceAsString()).build()); + TABLE = TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(TABLE, true); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException, InterruptedException { + TABLE.put(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("A"))); + TEST_UTIL.getHBaseAdmin().flush(TABLE_NAME); + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + Region region = null; + HRegionServer src; + HRegionServer dst; + for (Region r : cluster.getRegionServer(0).getOnlineRegions(TABLE_NAME)) { + region = r; + } + if (region != null) { + src = cluster.getRegionServer(0); + dst = cluster.getRegionServer(1); + } else { + for (Region r : cluster.getRegionServer(1).getOnlineRegions(TABLE_NAME)) { + region = r; + } + src = cluster.getRegionServer(1); + dst = cluster.getRegionServer(0); + } + assertNotNull(region); + assertFalse(TABLE.get(new Get(Bytes.toBytes(1))).isEmpty()); + final BucketCache bucketCache = + (BucketCache) src.getCacheConfig().getBlockCache().getBlockCaches()[1]; + while (bucketCache.backingMap.isEmpty()) { + Thread.sleep(100); + } + final BlockCacheKey key = bucketCache.backingMap.keySet().iterator().next(); + bucketCache.flag.setValue(true); + Thread evictThread = new Thread() { + + @Override + public void run() { + bucketCache.evictBlock(key); + } + + }; + evictThread.start(); + Thread.sleep(2000); + TEST_UTIL.getHBaseAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(dst.getServerName().getServerName())); + while (dst.getOnlineRegions(TABLE_NAME).isEmpty()) { + Thread.sleep(100); + } + assertTrue(bucketCache.backingMap.isEmpty()); + TEST_UTIL.getHBaseAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(src.getServerName().getServerName())); + while (src.getOnlineRegions(TABLE_NAME).isEmpty()) { + Thread.sleep(100); + } + assertFalse(TABLE.get(new Get(Bytes.toBytes(1))).isEmpty()); + while (bucketCache.backingMap.isEmpty()) { + Thread.sleep(100); + } + synchronized (bucketCache.flag) { + bucketCache.flag.setValue(false); + bucketCache.flag.notifyAll(); + } + evictThread.join(); + assertEquals(1L, bucketCache.getBlockCount()); + assertTrue(bucketCache.getCurrentSize() > 0L); + assertTrue("We should have a block!", bucketCache.iterator().hasNext()); + } +} -- 1.9.1