Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 1085792) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -181,6 +182,10 @@ public ZooKeeperWatcher getZooKeeperWatcher() { return null; } + + public RegionServerAccounting getRegionServerAccounting() { + return null; + } }; /** Index: src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java (revision 0) @@ -0,0 +1,134 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional infomation + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.*; +import java.util.List; +import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; + +/** + * Test HBASE-3694 whether the GlobalMemStoreSize is the same as the summary + * of all the online region's MemStoreSize + */ +public class TestGlobalMemStoreSize { + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static int regionServerNum =4; + private static int regionNum = 16; + // total region num = region num + root and meta regions + private static int totalRegionNum = regionNum+2; + + private HBaseTestingUtility TEST_UTIL; + private MiniHBaseCluster cluster; + + /** + * Test the global mem store size in the region server is equal to sum of each + * region's mem store size + * @throws Exception + */ + @Test + public void testGlobalMemStore() throws Exception { + // Start the cluster + LOG.info("Starting cluster"); + Configuration conf = HBaseConfiguration.create(); + conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000); + conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniCluster(1, regionServerNum); + cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Waiting for active/ready master"); + cluster.waitForActiveAndReadyMaster(); + + // Create a table with regions + byte [] table = Bytes.toBytes("TestGlobalMemStoreSize"); + byte [] family = Bytes.toBytes("family"); + LOG.info("Creating table with " + regionNum + " regions"); + HTable ht = TEST_UTIL.createTable(table, family); + int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, + regionNum); + assertEquals(regionNum,numRegions); + waitForAllRegionsAssigned(); + + for (HRegionServer server : getOnlineRegionServers()) { + long globalMemStoreSize = 0; + for(HRegionInfo regionInfo : server.getOnlineRegions()) { + globalMemStoreSize += + server.getFromOnlineRegions(regionInfo.getEncodedName()). + getMemstoreSize().get(); + } + assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(), + globalMemStoreSize); + } + + // check the global memstore size after flush + for (HRegionServer server : getOnlineRegionServers()) { + for(HRegionInfo regionInfo : server.getOnlineRegions()) { + HRegion region= + server.getFromOnlineRegions(regionInfo.getEncodedName()); + region.flushcache(); + } + assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(), + 0); + } + } + + /** figure out how many regions are currently being served. */ + private int getRegionCount() { + int total = 0; + for (HRegionServer server : getOnlineRegionServers()) { + total += server.getOnlineRegions().size(); + } + return total; + } + + private List getOnlineRegionServers() { + List list = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : + cluster.getRegionServerThreads()) { + if (rst.getRegionServer().isOnline()) { + list.add(rst.getRegionServer()); + } + } + return list; + } + + /** + * Wait until all the regions are assigned. + */ + private void waitForAllRegionsAssigned() { + while (getRegionCount() < totalRegionNum) { + LOG.debug("Waiting for there to be "+totalRegionNum+" regions, but there are " + getRegionCount() + " right now."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + } +} + Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1085792) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -273,6 +273,8 @@ // Replication services. If no replication, this handler will be null. private Replication replicationHandler; + private final RegionServerAccounting regionServerAccounting; + /** * Starts a HRegionServer at the default location * @@ -351,6 +353,8 @@ // login the server principal (if using secure Hadoop) User.login(conf, "hbase.regionserver.keytab.file", "hbase.regionserver.kerberos.principal", serverInfo.getHostname()); + + regionServerAccounting = new RegionServerAccounting(); } private static final int NORMAL_QOS = 0; @@ -883,6 +887,10 @@ } } + public RegionServerAccounting getRegionServerAccounting() { + return regionServerAccounting; + } + /* * @param r Region to get RegionLoad for. * @@ -2521,19 +2529,6 @@ } /** - * Return the total size of all memstores in every region. - * - * @return memstore size in bytes - */ - public long getGlobalMemStoreSize() { - long total = 0; - for (HRegion region : onlineRegions.values()) { - total += region.memstoreSize.get(); - } - return total; - } - - /** * @return Return the leases. */ protected Leases getLeases() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java (revision 0) @@ -0,0 +1,48 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * RegionServerAccounting keeps record of some basic real time information about + * the Region Server. Currently, it only keeps record the global memstore size. + */ +public class RegionServerAccounting { + + private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0); + + /** + * @return the global Memstore size in the RegionServer + */ + public long getGlobalMemstoreSize() { + return atomicGlobalMemstoreSize.get(); + } + + /** + * @param memStoreSize the Memstore size will be added to + * the global Memstore size + * @return the global Memstore size in the RegionServer + */ + public long addAndGetGlobalMemstoreSize(long memStoreSize) { + return atomicGlobalMemstoreSize.addAndGet(memStoreSize); + } + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1085792) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -65,6 +65,11 @@ * @return The HServerInfo for this RegionServer. */ public HServerInfo getServerInfo(); + + /** + * @return the RegionServerAccounting for this Region Server + */ + public RegionServerAccounting getRegionServerAccounting(); /** * Tasks to perform after region open to complete deploy of region on Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1085792) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -290,14 +290,16 @@ * Return true if global memory usage is above the high watermark */ private boolean isAboveHighWaterMark() { - return server.getGlobalMemStoreSize() >= globalMemStoreLimit; + return server.getRegionServerAccounting(). + getGlobalMemstoreSize() >= globalMemStoreLimit; } /** * Return true if we're above the high watermark */ private boolean isAboveLowWaterMark() { - return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark; + return server.getRegionServerAccounting(). + getGlobalMemstoreSize() >= globalMemStoreLimitLowMark; } public void requestFlush(HRegion r) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1085792) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -423,6 +423,28 @@ } return false; } + + public AtomicLong getMemstoreSize() { + return memstoreSize; + } + + /** + * Increase the size of mem store in this region and the size of global mem + * store + * @param memStoreSize + * @return the size of memstore in this region + */ + public long addAndGetMemstoreSize(long memStoreSize) { + if (this.rsServices != null) { + RegionServerAccounting rsAccounting = + this.rsServices.getRegionServerAccounting(); + + if (rsAccounting != null) { + rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); + } + } + return this.memstoreSize.getAndAdd(memStoreSize); + } /* * Write out an info file under the region directory. Useful recovering @@ -1033,7 +1055,7 @@ storeFlushers.clear(); // Set down the memstore size by amount of flush. - this.memstoreSize.addAndGet(-currentMemStoreSize); + this.addAndGetMemstoreSize(-currentMemStoreSize); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1332,7 +1354,7 @@ // Now make changes to the memstore. long addedSize = applyFamilyMapToMemstore(familyMap); - flush = isFlushSize(memstoreSize.addAndGet(addedSize)); + flush = isFlushSize(this.addAndGetMemstoreSize(addedSize)); if (coprocessorHost != null) { coprocessorHost.postDelete(familyMap, writeToWAL); @@ -1462,7 +1484,7 @@ startRegionOperation(); try { long addedSize = doMiniBatchPut(batchOp); - newSize = memstoreSize.addAndGet(addedSize); + newSize = this.addAndGetMemstoreSize(addedSize); } finally { closeRegionOperation(); } @@ -1841,7 +1863,7 @@ } long addedSize = applyFamilyMapToMemstore(familyMap); - flush = isFlushSize(memstoreSize.addAndGet(addedSize)); + flush = isFlushSize(this.addAndGetMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); } @@ -2155,7 +2177,7 @@ * @return True if we should flush. */ protected boolean restoreEdit(final Store s, final KeyValue kv) { - return isFlushSize(this.memstoreSize.addAndGet(s.add(kv))); + return isFlushSize(this.addAndGetMemstoreSize(s.add(kv))); } /* @@ -3274,7 +3296,7 @@ walEdits, now); } - size = this.memstoreSize.addAndGet(size); + size = this.addAndGetMemstoreSize(size); flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); @@ -3349,7 +3371,7 @@ // returns the change in the size of the memstore from operation long size = store.updateColumnValue(row, family, qualifier, result); - size = this.memstoreSize.addAndGet(size); + size = this.addAndGetMemstoreSize(size); flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock();