Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 1085593) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -181,6 +182,10 @@ public ZooKeeperWatcher getZooKeeperWatcher() { return null; } + + public RegionServerAccounting getRegionServerAccounting() { + return null; + } }; /** Index: src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java (revision 0) @@ -0,0 +1,184 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional infomation + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; + +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; + +/** + * Test whether region rebalancing works. (HBASE-71) + * Test HBASE-3663 whether region rebalancing works after a new server booted + * especially when no server has more regions than the ceils of avg load + */ +public class TestGlobalMemStoreSize extends HBaseClusterTestCase { + final Log LOG = LogFactory.getLog(this.getClass().getName()); + + HTable table; + + HTableDescriptor desc; + + final byte[] FIVE_HUNDRED_KBYTES; + + final byte [] FAMILY_NAME = Bytes.toBytes("col"); + + private static int regionServerNum =4; + private static int regionNum = 16; + // total region num = region num + root and meta regions + private static int totalRegionNum = regionNum+2; + + /** constructor */ + public TestGlobalMemStoreSize() { + super(regionServerNum); + + FIVE_HUNDRED_KBYTES = new byte[500 * 1024]; + for (int i = 0; i < 500 * 1024; i++) { + FIVE_HUNDRED_KBYTES[i] = 'x'; + } + + desc = new HTableDescriptor("test"); + desc.addFamily(new HColumnDescriptor(FAMILY_NAME)); + } + + /** + * Before the hbase cluster starts up, create some dummy regions. + */ + @Override + public void preHBaseClusterSetup() throws IOException { + // create a 20-region table by writing directly to disk + List startKeys = new ArrayList(); + startKeys.add(null); + for (int i = 10; i < regionNum+10; i++) { + startKeys.add(Bytes.toBytes("row_" + i)); + } + startKeys.add(null); + LOG.debug(startKeys.size() + " start keys generated"); + + List regions = new ArrayList(); + for (int i = 0; i < regionNum; i++) { + regions.add(createAregion(startKeys.get(i), startKeys.get(i+1))); + } + + // Now create the root and meta regions and insert the data regions + // created above into the meta + + createRootAndMetaRegions(); + for (HRegion region : regions) { + HRegion.addRegionToMETA(meta, region); + } + closeRootAndMeta(); + } + + /** + * Test the global mem store size in the region server is equal to sum of each + * region's mem store size + * @throws IOException + */ + + public void testGlobalMemStore() throws IOException { + waitForAllRegionsAssigned(); + assertEquals(getOnlineRegionServers().size(), regionServerNum); + assertEquals(getRegionCount(), totalRegionNum); + + int totalRegionNum = 0; + for (HRegionServer server : getOnlineRegionServers()) { + long globalMemStoreSize = 0; + totalRegionNum += server.getOnlineRegions().size(); + for(HRegionInfo regionInfo : server.getOnlineRegions()) { + globalMemStoreSize += + server.getFromOnlineRegions(regionInfo.getEncodedName()). + getMemstoreSize(). + get(); + } + assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(), + globalMemStoreSize); + } + + for (HRegionServer server : getOnlineRegionServers()) { + for(HRegionInfo regionInfo : server.getOnlineRegions()) { + HRegion region= + server.getFromOnlineRegions(regionInfo.getEncodedName()); + region.flushcache(); + } + assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(), + 0); + } + } + + /** figure out how many regions are currently being served. */ + private int getRegionCount() { + int total = 0; + System.out.println("getOnlineRegionServers "+ getOnlineRegionServers().size()); + for (HRegionServer server : getOnlineRegionServers()) { + total += server.getOnlineRegions().size(); + } + return total; + } + + private List getOnlineRegionServers() { + List list = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) { + if (rst.getRegionServer().isOnline()) { + list.add(rst.getRegionServer()); + } + } + return list; + } + + /** + * Wait until all the regions are assigned. + */ + private void waitForAllRegionsAssigned() { + while (getRegionCount() < totalRegionNum) { + LOG.debug("Waiting for there to be "+totalRegionNum+" regions, but there are " + getRegionCount() + " right now."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + } + + /** + * create a region with the specified start and end key and exactly one row + * inside. + */ + private HRegion createAregion(byte [] startKey, byte [] endKey) + throws IOException { + HRegion region = createNewHRegion(desc, startKey, endKey); + byte [] keyToWrite = startKey == null ? Bytes.toBytes("row_0000") : startKey; + Put put = new Put(keyToWrite); + put.add(FAMILY_NAME, null, Bytes.toBytes("test")); + region.put(put); + region.close(); + region.getLog().closeAndDelete(); + return region; + } +} + Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1085593) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -273,6 +273,8 @@ // Replication services. If no replication, this handler will be null. private Replication replicationHandler; + private final RegionServerAccounting regionServerAccounting; + /** * Starts a HRegionServer at the default location * @@ -351,6 +353,8 @@ // login the server principal (if using secure Hadoop) User.login(conf, "hbase.regionserver.keytab.file", "hbase.regionserver.kerberos.principal", serverInfo.getHostname()); + + regionServerAccounting = new RegionServerAccounting(); } private static final int NORMAL_QOS = 0; @@ -883,6 +887,10 @@ } } + public RegionServerAccounting getRegionServerAccounting() { + return regionServerAccounting; + } + /* * @param r Region to get RegionLoad for. * @@ -2521,19 +2529,6 @@ } /** - * Return the total size of all memstores in every region. - * - * @return memstore size in bytes - */ - public long getGlobalMemStoreSize() { - long total = 0; - for (HRegion region : onlineRegions.values()) { - total += region.memstoreSize.get(); - } - return total; - } - - /** * @return Return the leases. */ protected Leases getLeases() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java (revision 0) @@ -0,0 +1,40 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicLong; + +public class RegionServerAccounting { + + private final AtomicLong atomicGlobalMemstoreSize; + + public RegionServerAccounting() { + atomicGlobalMemstoreSize = new AtomicLong(0); + } + + public long getGlobalMemstoreSize() { + return atomicGlobalMemstoreSize.get(); + } + + public void incGlobalMemstoreSize(long memStoreSize) { + this.atomicGlobalMemstoreSize.addAndGet(memStoreSize); + } + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1085593) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -65,6 +65,11 @@ * @return The HServerInfo for this RegionServer. */ public HServerInfo getServerInfo(); + + /** + * @return the RegionServerAccounting for this Region Server + */ + public RegionServerAccounting getRegionServerAccounting(); /** * Tasks to perform after region open to complete deploy of region on Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1085593) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -290,14 +290,16 @@ * Return true if global memory usage is above the high watermark */ private boolean isAboveHighWaterMark() { - return server.getGlobalMemStoreSize() >= globalMemStoreLimit; + return server.getRegionServerAccounting(). + getGlobalMemstoreSize() >= globalMemStoreLimit; } /** * Return true if we're above the high watermark */ private boolean isAboveLowWaterMark() { - return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark; + return server.getRegionServerAccounting(). + getGlobalMemstoreSize() >= globalMemStoreLimitLowMark; } public void requestFlush(HRegion r) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1085593) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -423,6 +423,28 @@ } return false; } + + public AtomicLong getMemstoreSize() { + return memstoreSize; + } + + /** + * Increase the size of mem store in this region and the sum of global mem + * stores' size + * @param memStoreSize + * @return the size of memstore in this region + */ + public long incMemoryUsage(long memStoreSize) { + if (this.rsServices != null) { + RegionServerAccounting rsAccounting = + this.rsServices.getRegionServerAccounting(); + + if (rsAccounting != null) { + rsAccounting.incGlobalMemstoreSize(memStoreSize); + } + } + return this.memstoreSize.getAndAdd(memStoreSize); + } /* * Write out an info file under the region directory. Useful recovering @@ -1033,7 +1055,7 @@ storeFlushers.clear(); // Set down the memstore size by amount of flush. - this.memstoreSize.addAndGet(-currentMemStoreSize); + this.incMemoryUsage(-currentMemStoreSize); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1332,7 +1354,7 @@ // Now make changes to the memstore. long addedSize = applyFamilyMapToMemstore(familyMap); - flush = isFlushSize(memstoreSize.addAndGet(addedSize)); + flush = isFlushSize(this.incMemoryUsage(addedSize)); if (coprocessorHost != null) { coprocessorHost.postDelete(familyMap, writeToWAL); @@ -1462,7 +1484,7 @@ startRegionOperation(); try { long addedSize = doMiniBatchPut(batchOp); - newSize = memstoreSize.addAndGet(addedSize); + newSize = this.incMemoryUsage(addedSize); } finally { closeRegionOperation(); } @@ -1841,7 +1863,7 @@ } long addedSize = applyFamilyMapToMemstore(familyMap); - flush = isFlushSize(memstoreSize.addAndGet(addedSize)); + flush = isFlushSize(this.incMemoryUsage(addedSize)); } finally { this.updatesLock.readLock().unlock(); } @@ -2155,7 +2177,7 @@ * @return True if we should flush. */ protected boolean restoreEdit(final Store s, final KeyValue kv) { - return isFlushSize(this.memstoreSize.addAndGet(s.add(kv))); + return isFlushSize(this.incMemoryUsage(s.add(kv))); } /* @@ -3274,7 +3296,7 @@ walEdits, now); } - size = this.memstoreSize.addAndGet(size); + size = this.incMemoryUsage(size); flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); @@ -3349,7 +3371,7 @@ // returns the change in the size of the memstore from operation long size = store.updateColumnValue(row, family, qualifier, result); - size = this.memstoreSize.addAndGet(size); + size = this.incMemoryUsage(size); flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock();