Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1496586) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1496587) @@ -911,6 +911,19 @@ } /** + * Get the RegionServer which hosts a region with the given region name. + * @param regionName + * @return + */ + public HRegionServer getRSWithRegion(byte[] regionName) { + int index = hbaseCluster.getServerWith(regionName); + if (index == -1) { + return null; + } + return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); + } + + /** * Starts a MiniMRCluster with a default number of * TaskTracker's. * Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java (revision 1496587) @@ -0,0 +1,220 @@ +/** + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; + +/** + * Verify that the Online Config Changes on the HRegionServer side are actually + * happening. We should add tests for important configurations which will be + * changed online. + */ +public class TestRegionServerOnlineConfigChange extends TestCase { + static final Log LOG = + LogFactory.getLog(TestRegionServerOnlineConfigChange.class.getName()); + HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility(); + Configuration conf = null; + + HTable t1 = null; + HRegionServer rs1 = null; + byte[] r1name = null; + HRegion r1 = null; + + final String table1Str = "table1"; + final String columnFamily1Str = "columnFamily1"; + final byte[] TABLE1 = Bytes.toBytes(table1Str); + final byte[] COLUMN_FAMILY1 = Bytes.toBytes(columnFamily1Str); + + + @Override + public void setUp() throws Exception { + conf = hbaseTestingUtility.getConfiguration(); + hbaseTestingUtility.startMiniCluster(1,1); + t1 = hbaseTestingUtility.createTable(TABLE1, COLUMN_FAMILY1); + HRegionInfo firstHRI = t1.getRegionsInfo().keySet().iterator().next(); + r1name = firstHRI.getRegionName(); + rs1 = hbaseTestingUtility.getRSWithRegion(r1name); + r1 = rs1.getRegion(r1name); + } + + @Override + public void tearDown() throws Exception { + hbaseTestingUtility.shutdownMiniCluster(); + } + + /** + * Check if the number of compaction threads changes online + * @throws IOException + */ + public void testNumCompactionThreadsOnlineChange() throws IOException { + assertTrue(rs1.compactSplitThread != null); + int newNumSmallThreads = + rs1.compactSplitThread.getSmallCompactionThreadNum() + 1; + int newNumLargeThreads = + rs1.compactSplitThread.getLargeCompactionThreadNum() + 1; + + conf.setInt("hbase.regionserver.thread.compaction.small", + newNumSmallThreads); + conf.setInt("hbase.regionserver.thread.compaction.large", + newNumLargeThreads); + HRegionServer.configurationManager.notifyAllObservers(conf); + + assertEquals(newNumSmallThreads, + rs1.compactSplitThread.getSmallCompactionThreadNum()); + assertEquals(newNumLargeThreads, + rs1.compactSplitThread.getLargeCompactionThreadNum()); + } + + /** + * Test that the configurations in the CompactionConfiguration class change + * properly. + * + * @throws IOException + */ + public void testCompactionConfigurationOnlineChange() throws IOException { + String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX; + Store s = r1.getStore(COLUMN_FAMILY1); + + // Set the new compaction ratio to a different value. + double newCompactionRatio = + s.compactionManager.comConf.getCompactionRatio() + 0.1; + conf.setFloat(strPrefix + "ratio", (float)newCompactionRatio); + + // Notify all the observers, which includes the Store object. + HRegionServer.configurationManager.notifyAllObservers(conf); + + // Check if the compaction ratio got updated in the Compaction Configuration + assertEquals(newCompactionRatio, + s.compactionManager.comConf.getCompactionRatio(), + 0.00001); + + // Check if the off peak compaction ratio gets updated. + double newOffPeakCompactionRatio = + s.compactionManager.comConf.getCompactionRatioOffPeak() + 0.1; + conf.setFloat(strPrefix + "ratio.offpeak", + (float)newOffPeakCompactionRatio); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newOffPeakCompactionRatio, + s.compactionManager.comConf.getCompactionRatioOffPeak(), + 0.00001); + + // Check if the throttle point gets updated. + long newThrottlePoint = s.compactionManager.comConf.getThrottlePoint() + 10; + conf.setLong("hbase.regionserver.thread.compaction.throttle", + newThrottlePoint); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newThrottlePoint, + s.compactionManager.comConf.getThrottlePoint()); + + // Check if the minFilesToCompact gets updated. + int newMinFilesToCompact = + s.compactionManager.comConf.getMinFilesToCompact() + 1; + conf.setLong(strPrefix + "min", newMinFilesToCompact); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMinFilesToCompact, + s.compactionManager.comConf.getMinFilesToCompact()); + + // Check if the maxFilesToCompact gets updated. + int newMaxFilesToCompact = + s.compactionManager.comConf.getMaxFilesToCompact() + 1; + conf.setLong(strPrefix + "max", newMaxFilesToCompact); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMaxFilesToCompact, + s.compactionManager.comConf.getMaxFilesToCompact()); + + // Check if the Off peak start hour gets updated. + int newOffPeakStartHour = + (s.compactionManager.comConf.getOffPeakStartHour() + 1) % 24; + conf.setLong("hbase.offpeak.start.hour", newOffPeakStartHour); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newOffPeakStartHour, + s.compactionManager.comConf.getOffPeakStartHour()); + + // Check if the Off peak end hour gets updated. + int newOffPeakEndHour = + (s.compactionManager.comConf.getOffPeakEndHour() + 1) % 24; + conf.setLong("hbase.offpeak.end.hour", newOffPeakEndHour); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newOffPeakEndHour, + s.compactionManager.comConf.getOffPeakEndHour()); + + // Check if the minCompactSize gets updated. + long newMinCompactSize = + s.compactionManager.comConf.getMinCompactSize() + 1; + conf.setLong(strPrefix + "min.size", newMinCompactSize); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMinCompactSize, + s.compactionManager.comConf.getMinCompactSize()); + + // Check if the maxCompactSize gets updated. + long newMaxCompactSize = + s.compactionManager.comConf.getMaxCompactSize() - 1; + conf.setLong(strPrefix + "max.size", newMaxCompactSize); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMaxCompactSize, + s.compactionManager.comConf.getMaxCompactSize()); + + // Check if shouldExcludeBulk gets updated. + boolean newShouldExcludeBulk = + !s.compactionManager.comConf.shouldExcludeBulk(); + conf.setBoolean(strPrefix + "exclude.bulk", newShouldExcludeBulk); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newShouldExcludeBulk, + s.compactionManager.comConf.shouldExcludeBulk()); + + // Check if shouldDeleteExpired gets updated. + boolean newShouldDeleteExpired = + !s.compactionManager.comConf.shouldDeleteExpired(); + conf.setBoolean("hbase.store.delete.expired.storefile", + newShouldDeleteExpired); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newShouldDeleteExpired, + s.compactionManager.comConf.shouldDeleteExpired()); + + // Check if majorCompactionPeriod gets updated. + long newMajorCompactionPeriod = + s.compactionManager.comConf.getMajorCompactionPeriod() + 10; + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, newMajorCompactionPeriod); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMajorCompactionPeriod, + s.compactionManager.comConf.getMajorCompactionPeriod()); + + // Check if majorCompactionJitter gets updated. + float newMajorCompactionJitter = + s.compactionManager.comConf.getMajorCompactionJitter() + 0.02F; + conf.setFloat("hbase.hregion.majorcompaction.jitter", + newMajorCompactionJitter); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMajorCompactionJitter, + s.compactionManager.comConf.getMajorCompactionJitter(), 0.00001); + } +} + Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1496587) @@ -220,6 +220,30 @@ /** Default compaction manager class name. */ public static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName(); + /** Parameter name for the number of large compaction threads */ + public static final String LARGE_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.large"; + + /** Default number of large compaction threads */ + public static final int DEFAULT_LARGE_COMPACTION_THREADS = 1; + + /** Parameter name for the number of large compaction threads */ + public static final String SMALL_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.small"; + + /** Default number of small compaction threads */ + public static final int DEFAULT_SMALL_COMPACTION_THREADS = 1; + + /** Prefix for Compaction related configurations in Store */ + public static final String HSTORE_COMPACTION_PREFIX = + "hbase.hstore.compaction."; + + /** Parameter name for the number of split threads */ + public static final String SPLIT_THREADS = "hbase.regionserver.thread.split"; + + /** Default number of split threads */ + public static final int DEFAULT_SPLIT_THREADS = 1; + /** Parameter name for what master implementation to use. */ public static final String MASTER_IMPL = "hbase.master.impl"; Index: src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java (revision 1496587) @@ -29,8 +29,47 @@ import java.util.WeakHashMap; /** - * Maintains a set of all the classes which are would like to get notified - * when the Configuration is reloaded from disk. + * Maintains the set of all the classes which would like to get notified + * when the Configuration is reloaded from the disk in the Online Configuration + * Change mechanism, which lets you update certain configuration properties + * on-the-fly, without having to restart the cluster. + * + * If a class has configuration properties which you would like to be able to + * change on-the-fly, do the following: + * 1. Implement the {@link ConfigurationObserver} interface. This would require + * you to implement the + * {@link ConfigurationObserver#notifyOnChange(Configuration)} + * method. This is a callback that is used to notify your class' instance + * that the configuration has changed. In this method, you need to check + * if the new values for the properties that are of interest to your class + * are different from the cached values. If yes, update them. + * + * However, be careful with this. Certain properties might be trivially + * mutable online, but others might not. Two properties might be trivially + * mutable by themselves, but not when changed together. For example, if a + * method uses properties "a" and "b" to make some decision, and is running + * in parallel when the notifyOnChange() method updates "a", but hasn't + * yet updated "b", it might make a decision on the basis of a new value of + * "a", and an old value of "b". This might introduce subtle bugs. This + * needs to be dealt on a case-by-case basis, and this class does not provide + * any protection from such cases. + * + * 2. Register the appropriate instance of the class with the + * {@link ConfigurationManager} instance, using the + * {@link ConfigurationManager#registerObserver(ConfigurationObserver)} + * method. For the RS side of things, the ConfigurationManager is a static + * member of the {@link org.apache.hadoop.hbase.regionserver.HRegionServer} + * class. Be careful not to do this in the constructor, as you might cause + * the 'this' reference to escape. Use a factory method, or an initialize() + * method which is called after the construction of the object. + * + * 3. Deregister the instance using the + * {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)} + * method when it is going out of scope. In case you are not able to do that + * for any reason, it is still okay, since entries for dead observers are + * automatically collected during GC. But nonetheless, it is still a good + * practice to deregister your observer, whenever possible. + * */ public class ConfigurationManager { public static final Log LOG = LogFactory.getLog(ConfigurationManager.class); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1496587) @@ -377,7 +377,8 @@ // This object lets classes register themselves to get notified on // Configuration changes. - public ConfigurationManager configurationManager; + public static final ConfigurationManager configurationManager = + new ConfigurationManager(); public static long getResponseSizeLimit() { return responseSizeLimit; @@ -411,7 +412,6 @@ this.abortRequested = false; this.fsOk = true; this.conf = conf; - this.configurationManager = new ConfigurationManager(); this.connection = ServerConnectionManager.getConnection(conf); this.isOnline = false; @@ -558,6 +558,8 @@ // Compaction thread this.compactSplitThread = new CompactSplitThread(this); + // Registering the compactSplitThread object with the ConfigurationManager. + configurationManager.registerObserver(this.compactSplitThread); // Log rolling thread int hlogCntPerServer = this.conf.getInt(HConstants.HLOG_CNT_PER_SERVER, 2); @@ -1465,14 +1467,6 @@ return isOnline; } - /** - * Return the ConfigurationManager object. - * @return - */ - public ConfigurationManager getConfigurationManager() { - return this.configurationManager; - } - private void setupHLog(Path logDir, Path oldLogDir, int totalHLogCnt) throws IOException { hlogs = new HLog[totalHLogCnt]; for (int i = 0; i < totalHLogCnt; i++) { @@ -3829,12 +3823,9 @@ */ public void updateConfiguration() { LOG.info("Reloading the configuration from disk."); + // Reload the configuration from disk. conf.reloadConfiguration(); - // TODO @gauravm - // Move this to the notifyOnChange() method in HRegionServer - for (HRegion r : onlineRegions.values()) { - r.updateConfiguration(); - } + // Notify all the observers that the configuration has changed. configurationManager.notifyAllObservers(conf); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java (revision 1496587) @@ -47,43 +47,71 @@ Configuration conf; Store store; - long maxCompactSize; - long minCompactSize; - boolean shouldExcludeBulk; - int minFilesToCompact; - int maxFilesToCompact; - double compactionRatio; - double offPeekCompactionRatio; - int offPeakStartHour; - int offPeakEndHour; - long throttlePoint; - boolean shouldDeleteExpired; - long majorCompactionPeriod; - float majorCompactionJitter; + /** Since all these properties can change online, they are volatile **/ + volatile long maxCompactSize; + volatile long minCompactSize; + volatile boolean shouldExcludeBulk; + volatile int minFilesToCompact; + volatile int maxFilesToCompact; + volatile double compactionRatio; + volatile double offPeakCompactionRatio; + volatile int offPeakStartHour; + volatile int offPeakEndHour; + volatile long throttlePoint; + volatile boolean shouldDeleteExpired; + volatile long majorCompactionPeriod; + volatile float majorCompactionJitter; + /** Default values for the properties **/ + static final long defaultMaxCompactSize = Long.MAX_VALUE; + static final boolean defaultShouldExcludeBulk = false; + static final int defaultMaxFilesToCompact = 10; + static final float defaultCompactionRatio = 1.2F; + static final float defaultOffPeakCompactionRatio = 5.0F; + static final int defaultOffPeakStartHour = -1; + static final int defaultOffPeakEndHour = -1; + static final boolean defaultShouldDeleteExpired = true; + static final long defaultMajorCompactionPeriod = 1000*60*60*24; + static final float defaultMajorCompactionJitter = 0.20F; + CompactionConfiguration(Configuration conf, Store store) { this.conf = conf; this.store = store; - String strPrefix = "hbase.hstore.compaction."; + String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX; - maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE); - minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize); - shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk", false); + maxCompactSize = conf.getLong(strPrefix + "max.size", defaultMaxCompactSize); + minCompactSize = conf.getLong(strPrefix + "min.size", + store.getHRegion().memstoreFlushSize); + shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk", + defaultShouldExcludeBulk); minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min", - /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", HConstants.DEFAULT_MIN_FILES_TO_COMPACT))); - maxFilesToCompact = conf.getInt(strPrefix + "max", 10); - compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F); - offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F); + /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", + HConstants.DEFAULT_MIN_FILES_TO_COMPACT))); + maxFilesToCompact = conf.getInt(strPrefix + "max", + defaultMaxFilesToCompact); + compactionRatio = conf.getFloat(strPrefix + "ratio", + defaultCompactionRatio); + offPeakCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", + defaultOffPeakCompactionRatio); - offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1); - offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1); + offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", + defaultOffPeakStartHour); + offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", + defaultOffPeakEndHour); - throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", - 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize); - shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true); - majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); - majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + throttlePoint = + conf.getLong("hbase.regionserver.thread.compaction.throttle", + 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize); + shouldDeleteExpired = + conf.getBoolean("hbase.store.delete.expired.storefile", + defaultShouldDeleteExpired); + majorCompactionPeriod = + conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, + defaultMajorCompactionPeriod); + majorCompactionJitter = + conf.getFloat("hbase.hregion.majorcompaction.jitter", + defaultMajorCompactionJitter); } /** @@ -132,7 +160,7 @@ * @return Off peak Ratio used for compaction */ double getCompactionRatioOffPeak() { - return offPeekCompactionRatio; + return offPeakCompactionRatio; } /** @@ -179,4 +207,148 @@ return shouldDeleteExpired; } + /** + * Update the compaction configuration, when an online change is made. + * + * @param newConf + */ + protected void updateConfiguration(Configuration newConf) { + String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX; + + // Check if the compaction ratio has changed. + String compactionRatioStr = strPrefix + "ratio"; + double newCompactionRatio = newConf.getFloat(compactionRatioStr, + defaultCompactionRatio); + if (newCompactionRatio != this.compactionRatio) { + LOG.info("Changing the value of " + compactionRatioStr + " from " + + this.compactionRatio + " to " + newCompactionRatio); + this.compactionRatio = newCompactionRatio; + } + + // Check if the off peak compaction ratio has changed. + String offPeakCompactionRatioStr = strPrefix + "ratio.offpeak"; + double newOffPeakCompactionRatio = + newConf.getFloat(offPeakCompactionRatioStr, + defaultOffPeakCompactionRatio); + if (newOffPeakCompactionRatio != this.offPeakCompactionRatio) { + LOG.info("Changing the value of " + offPeakCompactionRatioStr + " from " + + this.offPeakCompactionRatio + " to " + newOffPeakCompactionRatio); + this.offPeakCompactionRatio = newOffPeakCompactionRatio; + } + + // Check if the throttle point has changed. + String throttlePointStr = "hbase.regionserver.thread.compaction.throttle"; + long newThrottlePoint = newConf.getLong(throttlePointStr, + 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize); + if (newThrottlePoint != this.throttlePoint) { + LOG.info("Changing the value of " + throttlePointStr + " from " + + this.throttlePoint + " to " + newThrottlePoint); + this.throttlePoint = newThrottlePoint; + } + + // Check if the minFilesToCompact has changed. + String minFilesToCompactStr = strPrefix + "min"; + int newMinFilesToCompact = Math.max(2, newConf.getInt(minFilesToCompactStr, + /*old name*/ newConf.getInt("hbase.hstore.compactionThreshold", + HConstants.DEFAULT_MIN_FILES_TO_COMPACT))); + if (newMinFilesToCompact != this.minFilesToCompact) { + LOG.info("Changing the value of " + minFilesToCompactStr + " from " + + this.minFilesToCompact + " to " + newMinFilesToCompact); + this.minFilesToCompact = newMinFilesToCompact; + } + + // Check if the maxFile to compact has changed. + String maxFilesToCompactStr = strPrefix + "max"; + int newMaxFilesToCompact = newConf.getInt(maxFilesToCompactStr, + defaultMaxFilesToCompact); + if (newMaxFilesToCompact != this.maxFilesToCompact) { + LOG.info("Changing the value of " + maxFilesToCompactStr + " from " + + this.maxFilesToCompact + " to " + newMaxFilesToCompact); + this.maxFilesToCompact = newMaxFilesToCompact; + } + + // Check if the Off Peak Start Hour has changed. + String offPeakStartHourStr = "hbase.offpeak.start.hour"; + int newOffPeakStartHour = newConf.getInt(offPeakStartHourStr, + defaultOffPeakStartHour); + if (newOffPeakStartHour != this.offPeakStartHour) { + LOG.info("Changing the value of " + offPeakStartHourStr + " from " + + this.offPeakStartHour + " to " + newOffPeakStartHour); + this.offPeakStartHour = newOffPeakStartHour; + } + + // Check if the Off Peak End Hour has changed. + String offPeakEndHourStr = "hbase.offpeak.end.hour"; + int newOffPeakEndHour = newConf.getInt(offPeakEndHourStr, + defaultOffPeakEndHour); + if (newOffPeakEndHour != this.offPeakEndHour) { + LOG.info("Changing the value of " + offPeakEndHourStr + " from " + + this.offPeakEndHour + " to " + newOffPeakEndHour); + this.offPeakEndHour = newOffPeakEndHour; + } + + // Check if the Min Compaction Size has changed + String minCompactSizeStr = strPrefix + "min.size"; + long newMinCompactSize = newConf.getLong(minCompactSizeStr, + store.getHRegion().memstoreFlushSize); + if (newMinCompactSize != this.minCompactSize) { + LOG.info("Changing the value of " + minCompactSizeStr + " from " + + this.minCompactSize + " to " + newMinCompactSize); + this.minCompactSize = newMinCompactSize; + } + + // Check if the Max Compaction Size has changed. + String maxCompactSizeStr = strPrefix + "max.size"; + long newMaxCompactSize = newConf.getLong(maxCompactSizeStr, + defaultMaxCompactSize); + if (newMaxCompactSize != this.maxCompactSize) { + LOG.info("Changing the value of " + maxCompactSizeStr + " from " + + this.maxCompactSize + " to " + newMaxCompactSize); + this.maxCompactSize = newMaxCompactSize; + } + + // Check if shouldExcludeBulk has changed. + String shouldExcludeBulkStr = strPrefix + "exclude.bulk"; + boolean newShouldExcludeBulk = newConf.getBoolean(shouldExcludeBulkStr, + defaultShouldExcludeBulk); + if (newShouldExcludeBulk != this.shouldExcludeBulk) { + LOG.info("Changing the value of " + shouldExcludeBulkStr + " from " + + this.shouldExcludeBulk + " to " + newShouldExcludeBulk); + this.shouldExcludeBulk = newShouldExcludeBulk; + } + + // Check if shouldDeleteExpired has changed. + String shouldDeleteExpiredStr = "hbase.store.delete.expired.storefile"; + boolean newShouldDeleteExpired = + newConf.getBoolean(shouldDeleteExpiredStr, + defaultShouldDeleteExpired); + if (newShouldDeleteExpired != this.shouldDeleteExpired) { + LOG.info("Changing the value of " + shouldDeleteExpiredStr + " from " + + this.shouldDeleteExpired + " to " + newShouldDeleteExpired); + this.shouldDeleteExpired = newShouldDeleteExpired; + } + + // Check if majorCompactionPeriod has changed. + long newMajorCompactionPeriod = + newConf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, + defaultMajorCompactionPeriod); + if (newMajorCompactionPeriod != this.majorCompactionPeriod) { + LOG.info("Changing the value of " + HConstants.MAJOR_COMPACTION_PERIOD + + " from " + this.majorCompactionPeriod + " to " + + newMajorCompactionPeriod); + this.majorCompactionPeriod = newMajorCompactionPeriod; + } + + // Check if majorCompactionJitter has changed. + String majorCompactionJitterStr = "hbase.hregion.majorcompaction.jitter"; + float newMajorCompactionJitter = + newConf.getFloat(majorCompactionJitterStr, + defaultMajorCompactionJitter); + if (newMajorCompactionJitter != this.majorCompactionJitter) { + LOG.info("Changing the value of " + majorCompactionJitterStr + " from " + + this.majorCompactionJitter + " to " + newMajorCompactionJitter); + this.majorCompactionJitter = newMajorCompactionJitter; + } + this.conf = newConf; + } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (revision 1496587) @@ -50,6 +50,14 @@ } /** + * Update the configuration when it changes on-the-fly. + * @param conf + */ + protected void updateConfiguration(Configuration conf) { + comConf.updateConfiguration(conf); + } + + /** * @param candidateFiles candidate files, ordered from oldest to newest * @return subset copy of candidate list that meets compaction criteria * @throws java.io.IOException Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1496587) @@ -30,15 +30,17 @@ import org.apache.hadoop.conf.Configuration; import com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; /** * Compact region on request and then run split if appropriate */ -public class CompactSplitThread { +public class CompactSplitThread implements ConfigurationObserver { static final Log LOG = LogFactory.getLog(CompactSplitThread.class); private final HRegionServer server; - private final Configuration conf; + private Configuration conf; private final ThreadPoolExecutor largeCompactions; private final ThreadPoolExecutor smallCompactions; @@ -58,10 +60,12 @@ Preconditions.checkArgument(this.server != null && this.conf != null); int largeThreads = Math.max(1, conf.getInt( - "hbase.regionserver.thread.compaction.large", 1)); - int smallThreads = conf.getInt( - "hbase.regionserver.thread.compaction.small", 1); - int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1); + HConstants.LARGE_COMPACTION_THREADS, + HConstants.DEFAULT_LARGE_COMPACTION_THREADS)); + int smallThreads = conf.getInt(HConstants.SMALL_COMPACTION_THREADS, + HConstants.DEFAULT_SMALL_COMPACTION_THREADS); + int splitThreads = conf.getInt(HConstants.SPLIT_THREADS, + HConstants.DEFAULT_SPLIT_THREADS); Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); @@ -208,4 +212,55 @@ size += smallCompactions.getQueue().size(); return size; } + + @Override + public void notifyOnChange(Configuration newConf) { + // Check if number of large / small compaction threads has changed, and then + // adjust the core pool size of the thread pools, by using the + // setCorePoolSize() method. According to the javadocs, it is safe to + // change the core pool size on-the-fly. We need to reset the maximum + // pool size, as well. + int largeThreads = Math.max(1, newConf.getInt( + HConstants.LARGE_COMPACTION_THREADS, + HConstants.DEFAULT_LARGE_COMPACTION_THREADS)); + if (this.largeCompactions.getCorePoolSize() != largeThreads) { + LOG.info("Changing the value of " + HConstants.LARGE_COMPACTION_THREADS + + " from " + this.largeCompactions.getCorePoolSize() + " to " + + largeThreads); + this.largeCompactions.setMaximumPoolSize(largeThreads); + this.largeCompactions.setCorePoolSize(largeThreads); + } + + int smallThreads = newConf.getInt(HConstants.SMALL_COMPACTION_THREADS, + HConstants.DEFAULT_SMALL_COMPACTION_THREADS); + if (this.smallCompactions.getCorePoolSize() != smallThreads) { + LOG.info("Changing the value of " + HConstants.SMALL_COMPACTION_THREADS + + " from " + this.smallCompactions.getCorePoolSize() + " to " + + smallThreads); + this.smallCompactions.setMaximumPoolSize(smallThreads); + this.smallCompactions.setCorePoolSize(smallThreads); + } + + this.conf = newConf; + } + + /** + * Helper method for tests to check if the number of small compaction threads + * change on-the-fly. + * + * @return + */ + protected int getSmallCompactionThreadNum() { + return this.smallCompactions.getCorePoolSize(); + } + + /** + * Helper method for tests to check if the number of large compaction threads + * change on-the-fly. + * + * @return + */ + protected int getLargeCompactionThreadNum() { + return this.largeCompactions.getCorePoolSize(); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1496587) @@ -860,7 +860,10 @@ completionService .submit(new Callable>() { public ImmutableList call() throws IOException { - return store.close(); + ImmutableList result = store.close(); + HRegionServer.configurationManager. + deregisterObserver(store); + return result; } }); } @@ -2674,7 +2677,10 @@ protected Store instantiateHStore(Path tableDir, HColumnDescriptor c) throws IOException { - return new Store(tableDir, this, c, this.fs, this.conf); + Store store = new Store(tableDir, this, c, this.fs, this.conf); + // Register this store with the configuration manager. + HRegionServer.configurationManager.registerObserver(store); + return store; } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1496586) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1496587) @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.io.WriteOptions; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -101,7 +102,8 @@ *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. */ -public class Store extends SchemaConfigured implements HeapSize { +public class Store extends SchemaConfigured implements HeapSize, + ConfigurationObserver { static final Log LOG = LogFactory.getLog(Store.class); protected final MemStore memstore; // This stores directory in the filesystem. @@ -110,7 +112,7 @@ private final HColumnDescriptor family; CompactionManager compactionManager; final FileSystem fs; - final Configuration conf; + Configuration conf; final CacheConfig cacheConf; // ttl in milliseconds. protected long ttl; @@ -1295,8 +1297,8 @@ // we have to use a do/while loop. ArrayList kvs = new ArrayList(); boolean hasMore; - // Create the writer whether or not there are output KVs, - // iff the maxSequenceID among the compaction candidates is + // Create the writer whether or not there are output KVs, + // iff the maxSequenceID among the compaction candidates is // equal to the maxSequenceID among all the on-disk hfiles. [HBASE-7267] if (maxCompactingSequcenceId == this.getMaxSequenceId(true)) { writer = createWriterInTmp(maxKeyCount, compression, true); @@ -1328,7 +1330,7 @@ bytesWritten += kv.getLength(); if (bytesWritten > Store.closeCheckInterval) { getSchemaMetrics().updatePersistentStoreMetric( - SchemaMetrics.StoreMetricType.COMPACTION_WRITE_SIZE, + SchemaMetrics.StoreMetricType.COMPACTION_WRITE_SIZE, bytesWritten); bytesWritten = 0; if (!this.region.areWritesEnabled()) { @@ -2004,4 +2006,12 @@ } } + @Override + public void notifyOnChange(Configuration conf) { + this.conf = new CompoundConfiguration() + .add(conf) + .add(family.getValues()); + + compactionManager.updateConfiguration(conf); + } }