From de87dd4273e9b73c3296abacfefe35b8c955b080 Mon Sep 17 00:00:00 2001 From: yaojingyi Date: Sun, 26 Apr 2020 21:30:23 +0800 Subject: [PATCH] HBASE-24262 Improvement update_config for PressureAwareCompactionThroughputController --- .../src/main/resources/hbase-default.xml | 18 -- .../hbase/regionserver/CompactSplit.java | 27 +- .../compactions/OffPeakHours.java | 8 + ...CompactionThroughputControllerFactory.java | 2 +- .../throttle/NoLimitThroughputController.java | 5 + ...reAwareCompactionThroughputController.java | 40 ++- ...ressureAwareFlushThroughputController.java | 4 + .../PressureAwareThroughputController.java | 14 ++ .../throttle/ThroughputController.java | 5 +- ...stDynamicConfWithThroughputController.java | 233 ++++++++++++++++++ 10 files changed, 330 insertions(+), 26 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestDynamicConfWithThroughputController.java diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index d2ff712a2c..d82260b0ac 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -998,24 +998,6 @@ possible configurations would overwhelm and obscure the important. See http://hbase.apache.org/book.html#offheap.blockcache for more information. - - hbase.hstore.compaction.throughput.lower.bound - 52428800 - The target lower bound on aggregate compaction throughput, in bytes/sec. Allows - you to tune the minimum available compaction throughput when the - PressureAwareCompactionThroughputController throughput controller is active. (It is active by - default.) - - - hbase.hstore.compaction.throughput.higher.bound - 104857600 - The target upper bound on aggregate compaction throughput, in bytes/sec. Allows - you to control aggregate compaction throughput demand when the - PressureAwareCompactionThroughputController throughput controller is active. (It is active by - default.) The maximum throughput will be tuned between the lower and upper bounds when - compaction pressure is within the range [0.0, 1.0]. If compaction pressure is 1.0 or greater - the higher bound will be ignored until pressure returns to the normal range. - hbase.bucketcache.size diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 659983b6c8..64dda97ecd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; + import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; @@ -34,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; @@ -742,12 +744,27 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } - ThroughputController old = this.compactionThroughputController; - if (old != null) { - old.stop("configuration change"); + String newCompactClass = newConf.get( + CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY); + String oldCompactClass = conf.get( + CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + CompactionThroughputControllerFactory.DEFAULT_THROUGHPUT_CONTROLLER_CLASS + .getName()); + if (oldCompactClass.equals(newCompactClass)) { + compactionThroughputController.updateConfig(newConf); + } else { + LOG.info( + CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY + + " is changed from " + oldCompactClass + " to " + + newCompactClass); + ThroughputController old = this.compactionThroughputController; + if (old != null) { + old.stop("configuration change"); + } + + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, newConf); } - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, newConf); // We change this atomically here instead of reloading the config in order that upstream // would be the only one with the flexibility to reload the config. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java index b920de2b57..0b0e31ad49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java @@ -101,5 +101,13 @@ public abstract class OffPeakHours { } return targetHour < endHour || startHour <= targetHour; } + + @Override + public String toString() { + return "{" + "startHour=" + startHour + ", endHour=" + + endHour + '}'; + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java index 45e7267ed2..9e40a351b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java @@ -36,7 +36,7 @@ public final class CompactionThroughputControllerFactory { private CompactionThroughputControllerFactory() { } - private static final Class + public static final Class DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class; // for backward compatibility and may not be supported in the future diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java index 4b1b261085..5c76d44b8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.throttle; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -43,6 +44,10 @@ public class NoLimitThroughputController implements ThroughputController { public void finish(String compactionName) { } + @Override + public void updateConfig(Configuration conf) { + } + private boolean stopped; @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java index 1c3952ed04..1023e63562 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java @@ -54,7 +54,7 @@ public class PressureAwareCompactionThroughputController extends PressureAwareTh public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = "hbase.hstore.compaction.throughput.lower.bound"; - private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = + public static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = 50L * 1024 * 1024; public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = @@ -146,4 +146,42 @@ public class PressureAwareCompactionThroughputController extends PressureAwareTh + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size() + "]"; } + + @Override public void updateConfig(Configuration newConf) { + this.maxThroughputUpperBound = newConf + .getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); + this.maxThroughputLowerBound = newConf + .getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND); + this.maxThroughputOffpeak = newConf + .getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); + this.offPeakHours = OffPeakHours.getInstance(newConf); + this.controlPerSize = this.maxThroughputLowerBound; + + LOG.info("updateParam Compaction throughput configurations, higher bound: " + + throughputDesc(maxThroughputUpperBound) + ", lower bound " + + throughputDesc(maxThroughputLowerBound) + ", off peak: " + + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + + tuningPeriod + " ms, offPeakHours: " + offPeakHours); + + } + + public long getMaxThroughputUpperBound() { + return maxThroughputUpperBound; + } + + public long getMaxThroughputLowerBound() { + return maxThroughputLowerBound; + } + + public long getMaxThroughputOffpeak() { + return maxThroughputOffpeak; + } + + public OffPeakHours getOffPeakHours() { + return offPeakHours; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java index 51e7b42bf9..a1a3ee6c18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java @@ -79,6 +79,10 @@ public class PressureAwareFlushThroughputController extends PressureAwareThrough }); } + @Override public void updateConfig(Configuration conf) { + + } + private void tune(double flushPressure) { double maxThroughputToSet; if (flushPressure >= 1.0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java index 306df0b9d5..0e5da79a68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java @@ -169,4 +169,18 @@ public abstract class PressureAwareThroughputController extends Configured imple this.maxThroughput = maxThroughput; maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); } + + public long getMaxThroughputUpperBound() { + return maxThroughputUpperBound; + } + + public long getMaxThroughputLowerBound() { + return maxThroughputLowerBound; + } + + public OffPeakHours getOffPeakHours() { + return offPeakHours; + } + + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java index 707d02d5f9..66a5afdf11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java @@ -17,10 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.throttle; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Stoppable; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.yetus.audience.InterfaceAudience; /** * A utility that constrains the total throughput of one or more simultaneous flows by @@ -49,4 +50,6 @@ public interface ThroughputController extends Stoppable { * Finish the controller. Should call this method in a finally block. */ void finish(String name); + + void updateConfig(Configuration conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestDynamicConfWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestDynamicConfWithThroughputController.java new file mode 100644 index 0000000000..adb595284d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestDynamicConfWithThroughputController.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.junit.Assert.assertEquals; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.net.URL; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDynamicConfWithThroughputController { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule + .forClass(TestDynamicConfWithThroughputController.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestDynamicConfWithThroughputController.class); + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final double EPSILON = 1E-6; + + long throughputLimit = 1024L * 1024; + + long newMaxThroughputUpperBound = throughputLimit * 6; + + long newMaxThroughputLowerBound = throughputLimit * 3; + + long newOffpeakLimit = throughputLimit * 4; + + @Test + public void testThroughputTuning() throws Exception { + + File sourceConfFile = null; + File bakConfFile = null; + try { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set( + CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + int tunePeriod = 5000; + conf.setInt( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, + tunePeriod); + + TEST_UTIL.startMiniCluster(1); + HRegionServer regionServer = + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + + PressureAwareCompactionThroughputController throughputController = + (PressureAwareCompactionThroughputController) regionServer.compactSplitThread + .getCompactionThroughputController(); + throughputLimit = + PressureAwareCompactionThroughputController + .DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND; + LOG.debug("conf:" + conf.getResource("hbase-site.xml")); + assertEquals(throughputLimit, throughputController.getMaxThroughput(), + EPSILON); + + LOG.debug("conf:" + conf.getResource("hbase-site.xml")); + URL oldConfFile = conf.getResource("hbase-site.xml"); + + sourceConfFile = new File(oldConfFile.getPath()); + + File newConfFile = new File(oldConfFile.getPath() + "_new"); + bakConfFile = new File(oldConfFile.getPath() + "_bak_src"); + FileUtils.copyFile(sourceConfFile, newConfFile); + FileUtils.copyFile(sourceConfFile, bakConfFile); + + addNewConf(newConfFile.getAbsolutePath()); + sourceConfFile.delete(); + FileUtils.copyFile(newConfFile, sourceConfFile); + + LOG.info("updateConfiguration"); + Admin admin = TEST_UTIL.getConnection().getAdmin(); + LOG.debug("regionServer.getServerName():" + regionServer.getServerName()); + + admin.updateConfiguration(regionServer.getServerName()); + + assertEquals(newMaxThroughputUpperBound, + throughputController.getMaxThroughputUpperBound(), EPSILON); + + assertEquals(newMaxThroughputLowerBound, + throughputController.getMaxThroughputLowerBound(), EPSILON); + + LOG.info("assertTrue OffPeakHour"); + + Thread.sleep(tunePeriod + 3000); + if (throughputController.getOffPeakHours().isOffPeakHour()) { + assertEquals(newOffpeakLimit, throughputController.getMaxThroughput(), + EPSILON); + } else { + assertEquals(newMaxThroughputLowerBound, + throughputController.getMaxThroughput(), EPSILON); + } + + } finally { + + sourceConfFile.delete(); + FileUtils.moveFile(bakConfFile, sourceConfFile); + + } + } + + public static String txt2String(File file) { + StringBuilder result = new StringBuilder(); + try { + BufferedReader br = new BufferedReader(new FileReader(file)); + String s = null; + while ((s = br.readLine()) != null) { + result.append(System.lineSeparator() + s); + } + br.close(); + } catch (Exception e) { + e.printStackTrace(); + } + return result.toString(); + } + + public void addNewConf(String newConfFile) { + + DocumentBuilderFactory bdg = DocumentBuilderFactory.newInstance(); + try { + DocumentBuilder builder = bdg.newDocumentBuilder(); + + Document dm = builder.parse(new FileInputStream(newConfFile)); + Element root = dm.getDocumentElement(); + + Element propertyHigher = makeElement(dm, + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + String.valueOf(newMaxThroughputUpperBound)); + root.appendChild(propertyHigher); + + Element propertLower = makeElement(dm, + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + String.valueOf(newMaxThroughputLowerBound)); + root.appendChild(propertLower); + + Element propertyStartHour = makeElement(dm, + CompactionConfiguration.HBASE_HSTORE_OFFPEAK_START_HOUR, + String.valueOf(0)); + root.appendChild(propertyStartHour); + + Element propertyEndHour = + makeElement(dm, CompactionConfiguration.HBASE_HSTORE_OFFPEAK_END_HOUR, + String.valueOf(6)); + root.appendChild(propertyEndHour); + + Element propertyOffpeakLimit = makeElement(dm, + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, + String.valueOf(newOffpeakLimit)); + root.appendChild(propertyOffpeakLimit); + + Transformer tfd = TransformerFactory.newInstance().newTransformer(); + tfd.transform(new DOMSource(dm), new StreamResult(newConfFile)); + } catch (ParserConfigurationException e) { + + } catch (Exception e) { + LOG.error("addNewConf error", e); + } + } + + private Element makeElement(Document dm, String key, String value) { + Element property = dm.createElement("property"); + Element ename = dm.createElement("name"); + ename.setTextContent(key); + Element evalue = dm.createElement("value"); + evalue.setTextContent(value); + property.appendChild(ename); + property.appendChild(evalue); + + return property; + + } + +} -- 2.25.0