From 8338db1c30bfd15b574b8b0f358f15b6f81abdb6 Mon Sep 17 00:00:00 2001 From: manukranthk Date: Wed, 1 Oct 2014 15:13:58 -0700 Subject: [PATCH] Online config change --- .../java/org/apache/hadoop/hbase/HConstants.java | 2 +- .../hadoop/hbase/conf/ConfigurationManager.java | 136 ++++++++++++++ .../hadoop/hbase/conf/ConfigurationObserver.java | 36 ++++ .../hbase/regionserver/CompactSplitThread.java | 93 +++++++++- .../apache/hadoop/hbase/regionserver/HRegion.java | 2 + .../hadoop/hbase/regionserver/HRegionServer.java | 31 +++- .../apache/hadoop/hbase/regionserver/HStore.java | 25 ++- .../apache/hadoop/hbase/regionserver/Store.java | 3 +- .../hadoop/hbase/regionserver/StoreEngine.java | 9 +- .../compactions/CompactionConfiguration.java | 202 ++++++++++++++++++-- .../regionserver/compactions/CompactionPolicy.java | 8 +- .../regionserver/compactions/OffPeakHours.java | 4 +- .../hbase/conf/TestConfigurationManager.java | 133 ++++++++++++++ .../TestRegionServerOnlineConfigChange.java | 203 +++++++++++++++++++++ .../regionserver/compactions/TestOffPeakHours.java | 8 +- 15 files changed, 858 insertions(+), 37 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/conf/TestConfigurationManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ba152c0..70c7dfe 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1040,7 +1040,7 @@ public final class HConstants { * memory size to give to the cache (if < 1.0) OR, it is the capacity in megabytes of the cache. */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; - + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java new file mode 100644 index 0000000..a95e7b1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java @@ -0,0 +1,136 @@ +/** + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.util.Collections; +import java.util.Set; +import java.util.WeakHashMap; + +/** + * Maintains the set of all the classes which would like to get notified + * when the Configuration is reloaded from the disk in the Online Configuration + * Change mechanism, which lets you update certain configuration properties + * on-the-fly, without having to restart the cluster. + * + * If a class has configuration properties which you would like to be able to + * change on-the-fly, do the following: + * 1. Implement the {@link ConfigurationObserver} interface. This would require + * you to implement the + * {@link ConfigurationObserver#notifyOnChange(Configuration)} + * method. This is a callback that is used to notify your class' instance + * that the configuration has changed. In this method, you need to check + * if the new values for the properties that are of interest to your class + * are different from the cached values. If yes, update them. + * + * However, be careful with this. Certain properties might be trivially + * mutable online, but others might not. Two properties might be trivially + * mutable by themselves, but not when changed together. For example, if a + * method uses properties "a" and "b" to make some decision, and is running + * in parallel when the notifyOnChange() method updates "a", but hasn't + * yet updated "b", it might make a decision on the basis of a new value of + * "a", and an old value of "b". This might introduce subtle bugs. This + * needs to be dealt on a case-by-case basis, and this class does not provide + * any protection from such cases. + * + * 2. Register the appropriate instance of the class with the + * {@link ConfigurationManager} instance, using the + * {@link ConfigurationManager#registerObserver(ConfigurationObserver)} + * method. For the RS side of things, the ConfigurationManager is a static + * member of the {@link org.apache.hadoop.hbase.regionserver.HRegionServer} + * class. Be careful not to do this in the constructor, as you might cause + * the 'this' reference to escape. Use a factory method, or an initialize() + * method which is called after the construction of the object. + * + * 3. Deregister the instance using the + * {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)} + * method when it is going out of scope. In case you are not able to do that + * for any reason, it is still okay, since entries for dead observers are + * automatically collected during GC. But nonetheless, it is still a good + * practice to deregister your observer, whenever possible. + * + */ +public class ConfigurationManager { + public static final Log LOG = LogFactory.getLog(ConfigurationManager.class); + + // The set of Configuration Observers. These classes would like to get + // notified when the configuration is reloaded from disk. This is a set + // constructed from a WeakHashMap, whose entries would be removed if the + // observer classes go out of scope. + private Set configurationObservers = + Collections.newSetFromMap(new WeakHashMap()); + + /** + * Register an observer class + * @param observer + */ + public void registerObserver(ConfigurationObserver observer) { + synchronized (configurationObservers) { + configurationObservers.add(observer); + } + } + + /** + * Deregister an observer class + * @param observer + */ + public void deregisterObserver(ConfigurationObserver observer) { + synchronized (configurationObservers) { + configurationObservers.remove(observer); + } + } + + /** + * The conf object has been repopulated from disk, and we have to notify + * all the observers that are expressed interest to do that. + */ + public void notifyAllObservers(Configuration conf) { + synchronized (configurationObservers) { + for (ConfigurationObserver observer : configurationObservers) { + try { + observer.notifyOnChange(conf); + } catch (NullPointerException e) { + // Though not likely, but a null pointer exception might happen + // if the GC happens after this iteration started and before + // we call the notifyOnChange() method. A + // ConcurrentModificationException will not happen since GC doesn't + // change the structure of the map. + LOG.error("Encountered a NPE while notifying observers."); + } catch (Throwable t) { + LOG.error("Encountered a throwable while notifying observers: " + + t.getMessage()); + } + } + } + } + + /** + * Return the number of observers. + * @return + */ + public int getNumObservers() { + return configurationObservers.size(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java new file mode 100644 index 0000000..8d07540 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationObserver.java @@ -0,0 +1,36 @@ +/** + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.conf; + +import org.apache.hadoop.conf.Configuration; + +/** + * Every class that wants to observe changes in Configuration properties, + * must implement interface (and also, register itself with the + * ConfigurationManager object. + */ +public interface ConfigurationObserver { + + /** + * This method would be called by the ConfigurationManager + * object when the Configuration object is reloaded from disk. + */ + void notifyOnChange(Configuration conf); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index b97ebee..43e9554 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -35,7 +35,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -50,9 +52,27 @@ import com.google.common.base.Preconditions; * Compact region on request and then run split if appropriate */ @InterfaceAudience.Private -public class CompactSplitThread implements CompactionRequestor { +public class CompactSplitThread implements CompactionRequestor, ConfigurationObserver { static final Log LOG = LogFactory.getLog(CompactSplitThread.class); + // Configuration key for the large compaction threads. + public final static String LARGE_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.large"; + public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; + + // Configuration key for the small compaction threads. + public final static String SMALL_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.small"; + public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; + + // Configuration key for split threads + public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; + public final static int SPLIT_THREADS_DEFAULT = 1; + + // Configuration keys for merge threads + public final static String MERGE_THREADS = "hbase.regionserver.thread.merge"; + public final static int MERGE_THREADS_DEFAULT = 1; + private final HRegionServer server; private final Configuration conf; @@ -77,11 +97,11 @@ public class CompactSplitThread implements CompactionRequestor { Integer.MAX_VALUE); int largeThreads = Math.max(1, conf.getInt( - "hbase.regionserver.thread.compaction.large", 1)); + LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); int smallThreads = conf.getInt( - "hbase.regionserver.thread.compaction.small", 1); + SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); - int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1); + int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); // if we have throttle threads, make sure the user also specified size Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); @@ -121,7 +141,7 @@ public class CompactSplitThread implements CompactionRequestor { return t; } }); - int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1); + int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT); this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool( mergeThreads, new ThreadFactory() { @Override @@ -147,7 +167,7 @@ public class CompactSplitThread implements CompactionRequestor { queueLists.append("Compaction/Split Queue dump:\n"); queueLists.append(" LargeCompation Queue:\n"); BlockingQueue lq = longCompactions.getQueue(); - Iterator it = lq.iterator(); + Iterator it = lq.iterator(); while(it.hasNext()){ queueLists.append(" "+it.next().toString()); queueLists.append("\n"); @@ -540,4 +560,65 @@ public class CompactSplitThread implements CompactionRequestor { } } } + + @Override + public void notifyOnChange(Configuration newConf) { + // Check if number of large / small compaction threads has changed, and then + // adjust the core pool size of the thread pools, by using the + // setCorePoolSize() method. According to the javadocs, it is safe to + // change the core pool size on-the-fly. We need to reset the maximum + // pool size, as well. + int largeThreads = Math.max(1, newConf.getInt( + LARGE_COMPACTION_THREADS, + LARGE_COMPACTION_THREADS_DEFAULT)); + if (this.longCompactions.getCorePoolSize() != largeThreads) { + LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + + " from " + this.longCompactions.getCorePoolSize() + " to " + + largeThreads); + this.longCompactions.setMaximumPoolSize(largeThreads); + this.longCompactions.setCorePoolSize(largeThreads); + } + + int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, + SMALL_COMPACTION_THREADS_DEFAULT); + if (this.shortCompactions.getCorePoolSize() != smallThreads) { + LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + + " from " + this.shortCompactions.getCorePoolSize() + " to " + + smallThreads); + this.shortCompactions.setMaximumPoolSize(smallThreads); + this.shortCompactions.setCorePoolSize(smallThreads); + } + + int splitThreads = newConf.getInt(SPLIT_THREADS, + SPLIT_THREADS_DEFAULT); + if (this.splits.getCorePoolSize() != splitThreads) { + LOG.info("Changing the value of " + SPLIT_THREADS + + " from " + this.splits.getCorePoolSize() + " to " + + splitThreads); + this.splits.setMaximumPoolSize(smallThreads); + this.splits.setCorePoolSize(smallThreads); + } + + int mergeThreads = newConf.getInt(MERGE_THREADS, + MERGE_THREADS_DEFAULT); + if (this.mergePool.getCorePoolSize() != mergeThreads) { + LOG.info("Changing the value of " + MERGE_THREADS + + " from " + this.mergePool.getCorePoolSize() + " to " + + mergeThreads); + this.mergePool.setMaximumPoolSize(smallThreads); + this.mergePool.setCorePoolSize(smallThreads); + } + + // We change this atomically here instead of reloading the config in order that upstream + // would be the only one with the flexibility to reload the config. + this.conf.reloadConfiguration(); + } + + protected int getSmallCompactionThreadNum() { + return this.shortCompactions.getCorePoolSize(); + } + + public int getLargeCompactionThreadNum() { + return this.longCompactions.getCorePoolSize(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e362a17..f2c43d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -823,6 +823,7 @@ public class HRegion implements HeapSize { // , Writable{ if (maxStoreMemstoreTS > maxMemstoreTS) { maxMemstoreTS = maxStoreMemstoreTS; } + HRegionServer.configurationManager.registerObserver(store); } allStoresOpened = true; } catch (InterruptedException e) { @@ -1241,6 +1242,7 @@ public class HRegion implements HeapSize { // , Writable{ // close each store in parallel for (final Store store : stores.values()) { assert abort || store.getFlushableSize() == 0; + HRegionServer.configurationManager.deregisterObserver(store); completionService .submit(new Callable>>() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index dbe2c79..4ef858f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -75,6 +75,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -161,7 +163,7 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Private @SuppressWarnings("deprecation") public class HRegionServer extends HasThread implements - RegionServerServices, LastSequenceId { + RegionServerServices, LastSequenceId, ConfigurationObserver { public static final Log LOG = LogFactory.getLog(HRegionServer.class); @@ -421,6 +423,12 @@ public class HRegionServer extends HasThread implements protected BaseCoordinatedStateManager csm; /** + * Configuration manager is used to register/deregister and notify the configuration observers + * when the regionserver is notified that there was a change in the on disk configs. + */ + public final static ConfigurationManager configurationManager = new ConfigurationManager(); + + /** * Starts a HRegionServer at the default location. * @param conf * @throws IOException @@ -734,6 +742,12 @@ public class HRegionServer extends HasThread implements if (storefileRefreshPeriod > 0) { this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this); } + registerConfigurationObserversToConfigurationManager(); + } + + private void registerConfigurationObserversToConfigurationManager() { + // Registering the compactSplitThread object with the ConfigurationManager. + configurationManager.registerObserver(this.compactSplitThread); } /** @@ -2988,4 +3002,19 @@ public class HRegionServer extends HasThread implements public CacheConfig getCacheConfig() { return this.cacheConfig; } + + /** + * @return : the ConfigurationManager object which manages the configurations on this Regionserver. + */ + public ConfigurationManager getConfigurationManager() { + return configurationManager; + } + + + + @Override + public void notifyOnChange(Configuration conf) { + // TODO Auto-generated method stub + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7a331b1..27d3b66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -135,7 +135,7 @@ public class HStore implements Store { private final HRegion region; private final HColumnDescriptor family; private final HRegionFileSystem fs; - private final Configuration conf; + private Configuration conf; private final CacheConfig cacheConf; private long lastCompactSize = 0; volatile boolean forceMajor = false; @@ -178,7 +178,7 @@ public class HStore implements Store { final StoreEngine storeEngine; private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); - private final OffPeakHours offPeakHours; + private volatile OffPeakHours offPeakHours; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; private int flushRetriesNumber; @@ -2202,4 +2202,25 @@ public class HStore implements Store { public long getMajorCompactedCellsSize() { return majorCompactedCellsSize; } + + /** + * Returns the StoreEngine that is backing this concrete implementation of Store. + * @return + */ + protected StoreEngine getStoreEngine() { + return this.storeEngine; + } + + protected OffPeakHours getOffPeakHours() { + return this.offPeakHours; + } + + @Override + public void notifyOnChange(Configuration conf) { + this.conf = new CompoundConfiguration() + .add(conf) + .addBytesMap(family.getValues()); + this.storeEngine.compactionPolicy.setConf(conf); + this.offPeakHours = OffPeakHours.getInstance(conf); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 9078c44..02dcb92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -48,7 +49,7 @@ import org.apache.hadoop.hbase.util.Pair; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface Store extends HeapSize, StoreConfigInformation { +public interface Store extends HeapSize, StoreConfigInformation, ConfigurationObserver { /* The default priority for user-specified compaction requests. * The user gets top priority unless we have blocking compactions. (Pri <= 0) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 519767c..696a21c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -37,7 +38,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; */ @InterfaceAudience.Private public abstract class StoreEngine { + CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> + implements ConfigurationObserver { protected SF storeFlusher; protected CP compactionPolicy; protected C compactor; @@ -128,4 +130,9 @@ public abstract class StoreEngineConfigurationManager + * when the Configuration is reloaded. + */ + public void testCheckIfObserversNotified() { + Configuration conf = new Configuration(); + ConfigurationManager cm = new ConfigurationManager(); + DummyConfigurationObserver d1 = new DummyConfigurationObserver(cm); + + // Check if we get notified. + cm.notifyAllObservers(conf); + assertTrue(d1.wasNotifiedOnChange()); + d1.resetNotifiedOnChange(); + + // Now check if we get notified on change with more than one observers. + DummyConfigurationObserver d2 = new DummyConfigurationObserver(cm); + cm.notifyAllObservers(conf); + assertTrue(d1.wasNotifiedOnChange()); + d1.resetNotifiedOnChange(); + assertTrue(d2.wasNotifiedOnChange()); + d2.resetNotifiedOnChange(); + + // Now try deregistering an observer and verify that it was not notified + d2.deregister(); + cm.notifyAllObservers(conf); + assertTrue(d1.wasNotifiedOnChange()); + d1.resetNotifiedOnChange(); + assertFalse(d2.wasNotifiedOnChange()); + } + + // Register an observer that will go out of scope immediately, allowing + // us to test that out of scope observers are deregistered. + private void registerLocalObserver(ConfigurationManager cm) { + new DummyConfigurationObserver(cm); + } + + /** + * Test if out-of-scope observers are deregistered on GC. + */ + public void testDeregisterOnOutOfScope() { + Configuration conf = new Configuration(); + ConfigurationManager cm = new ConfigurationManager(); + + boolean outOfScopeObserversDeregistered = false; + + // On my machine, I was able to cause a GC after around 5 iterations. + // If we do not cause a GC in 100k iterations, which is very unlikely, + // there might be something wrong with the GC. + for (int i = 0; i < 100000; i++) { + registerLocalObserver(cm); + cm.notifyAllObservers(conf); + + // 'Suggest' the system to do a GC. We should be able to cause GC + // atleast once in the 2000 iterations. + System.gc(); + + // If GC indeed happened, all the observers (which are all out of scope), + // should have been deregistered. + if (cm.getNumObservers() <= i) { + outOfScopeObserversDeregistered = true; + break; + } + } + if (!outOfScopeObserversDeregistered) { + LOG.warn("Observers were not GC-ed! Something seems to be wrong."); + } + assertTrue(outOfScopeObserversDeregistered); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java new file mode 100644 index 0000000..208b382 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java @@ -0,0 +1,203 @@ +/** + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; + +/** + * Verify that the Online config Changes on the HRegionServer side are actually + * happening. We should add tests for important configurations which will be + * changed online. + */ +public class TestRegionServerOnlineConfigChange extends TestCase { + static final Log LOG = + LogFactory.getLog(TestRegionServerOnlineConfigChange.class.getName()); + HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility(); + Configuration conf = null; + + HTable t1 = null; + HRegionServer rs1 = null; + byte[] r1name = null; + HRegion r1 = null; + + final String table1Str = "table1"; + final String columnFamily1Str = "columnFamily1"; + final byte[] TABLE1 = Bytes.toBytes(table1Str); + final byte[] COLUMN_FAMILY1 = Bytes.toBytes(columnFamily1Str); + + + @Override + public void setUp() throws Exception { + conf = hbaseTestingUtility.getConfiguration(); + hbaseTestingUtility.startMiniCluster(1,1); + t1 = hbaseTestingUtility.createTable(TABLE1, COLUMN_FAMILY1); + @SuppressWarnings("deprecation") + HRegionInfo firstHRI = t1.getRegionLocations().keySet().iterator().next(); + r1name = firstHRI.getRegionName(); + rs1 = hbaseTestingUtility.getHBaseCluster().getRegionServer( + hbaseTestingUtility.getHBaseCluster().getServerWith(r1name)); + r1 = rs1.getRegion(r1name); + } + + @Override + public void tearDown() throws Exception { + hbaseTestingUtility.shutdownMiniCluster(); + } + + /** + * Check if the number of compaction threads changes online + * @throws IOException + */ + public void testNumCompactionThreadsOnlineChange() throws IOException { + assertTrue(rs1.compactSplitThread != null); + int newNumSmallThreads = + rs1.compactSplitThread.getSmallCompactionThreadNum() + 1; + int newNumLargeThreads = + rs1.compactSplitThread.getLargeCompactionThreadNum() + 1; + + conf.setInt("hbase.regionserver.thread.compaction.small", + newNumSmallThreads); + conf.setInt("hbase.regionserver.thread.compaction.large", + newNumLargeThreads); + HRegionServer.configurationManager.notifyAllObservers(conf); + + assertEquals(newNumSmallThreads, + rs1.compactSplitThread.getSmallCompactionThreadNum()); + assertEquals(newNumLargeThreads, + rs1.compactSplitThread.getLargeCompactionThreadNum()); + } + + /** + * Test that the configurations in the CompactionConfiguration class change + * properly. + * + * @throws IOException + */ + public void testCompactionConfigurationOnlineChange() throws IOException { + String strPrefix = "hbase.hstore.compaction."; + Store s = r1.getStore(COLUMN_FAMILY1); + if (!(s instanceof HStore)) { + LOG.error("Can't test the compaction configuration of HStore class. " + + "Got a different implementation other than HStore"); + return; + } + HStore hstore = (HStore)s; + + // Set the new compaction ratio to a different value. + double newCompactionRatio = + hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatio() + 0.1; + conf.setFloat(strPrefix + "ratio", (float)newCompactionRatio); + + // Notify all the observers, which includes the Store object. + HRegionServer.configurationManager.notifyAllObservers(conf); + + // Check if the compaction ratio got updated in the Compaction Configuration + assertEquals(newCompactionRatio, + hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatio(), + 0.00001); + + // Check if the off peak compaction ratio gets updated. + double newOffPeakCompactionRatio = + hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatioOffPeak() + 0.1; + conf.setFloat(strPrefix + "ratio.offpeak", + (float)newOffPeakCompactionRatio); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newOffPeakCompactionRatio, + hstore.getStoreEngine().getCompactionPolicy().getConf().getCompactionRatioOffPeak(), + 0.00001); + + // Check if the throttle point gets updated. + long newThrottlePoint = + hstore.getStoreEngine().getCompactionPolicy().getConf().getThrottlePoint() + 10; + conf.setLong("hbase.regionserver.thread.compaction.throttle", + newThrottlePoint); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newThrottlePoint, + hstore.getStoreEngine().getCompactionPolicy().getConf().getThrottlePoint()); + + // Check if the minFilesToCompact gets updated. + int newMinFilesToCompact = + hstore.getStoreEngine().getCompactionPolicy().getConf().getMinFilesToCompact() + 1; + conf.setLong(strPrefix + "min", newMinFilesToCompact); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMinFilesToCompact, + hstore.getStoreEngine().getCompactionPolicy().getConf().getMinFilesToCompact()); + + // Check if the maxFilesToCompact gets updated. + int newMaxFilesToCompact = + hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact() + 1; + conf.setLong(strPrefix + "max", newMaxFilesToCompact); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMaxFilesToCompact, + hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); + + // Check OffPeak hours is updated in an online fashion. + conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, 6); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, 7); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertFalse(hstore.getOffPeakHours().isOffPeakHour(4)); + + // Check if the minCompactSize gets updated. + long newMinCompactSize = + hstore.getStoreEngine().getCompactionPolicy().getConf().getMinCompactSize() + 1; + conf.setLong(strPrefix + "min.size", newMinCompactSize); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMinCompactSize, + hstore.getStoreEngine().getCompactionPolicy().getConf().getMinCompactSize()); + + // Check if the maxCompactSize gets updated. + long newMaxCompactSize = + hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxCompactSize() - 1; + conf.setLong(strPrefix + "max.size", newMaxCompactSize); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMaxCompactSize, + hstore.getStoreEngine().getCompactionPolicy().getConf().getMaxCompactSize()); + + // Check if majorCompactionPeriod gets updated. + long newMajorCompactionPeriod = + hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionPeriod() + 10; + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, newMajorCompactionPeriod); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMajorCompactionPeriod, + hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionPeriod()); + + // Check if majorCompactionJitter gets updated. + float newMajorCompactionJitter = + hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionJitter() + 0.02F; + conf.setFloat("hbase.hregion.majorcompaction.jitter", + newMajorCompactionJitter); + HRegionServer.configurationManager.notifyAllObservers(conf); + assertEquals(newMajorCompactionJitter, + hstore.getStoreEngine().getCompactionPolicy().getConf().getMajorCompactionJitter(), 0.00001); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java index 0b0d608..f43c29a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java @@ -62,16 +62,16 @@ public class TestOffPeakHours { @Test public void testSetPeakHourToTargetTime() { - conf.setLong("hbase.offpeak.start.hour", hourMinusOne); - conf.setLong("hbase.offpeak.end.hour", hourPlusOne); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, hourMinusOne); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, hourPlusOne); OffPeakHours target = OffPeakHours.getInstance(conf); assertTrue(target.isOffPeakHour(hourOfDay)); } @Test public void testSetPeakHourOutsideCurrentSelection() { - conf.setLong("hbase.offpeak.start.hour", hourMinusTwo); - conf.setLong("hbase.offpeak.end.hour", hourMinusOne); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, hourMinusTwo); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, hourMinusOne); OffPeakHours target = OffPeakHours.getInstance(conf); assertFalse(target.isOffPeakHour(hourOfDay)); } -- 1.9.4