From fb5dae77b9528ddf48b407e6a784f124f5c309fb Mon Sep 17 00:00:00 2001 From: Ashu Pachauri Date: Mon, 11 Apr 2016 07:43:06 -0700 Subject: [PATCH] HBASE-15429 Add split policy for busy regions --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 63 ++++++++ hbase-common/src/main/resources/hbase-default.xml | 2 +- .../hbase/regionserver/BusyRegionSplitPolicy.java | 163 +++++++++++++++++++++ .../hbase/regionserver/TestRegionSplitPolicy.java | 41 ++++++ 4 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BusyRegionSplitPolicy.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 4283330..a20eb3c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -88,6 +88,15 @@ public class HTableDescriptor implements Comparable { private static final Bytes MAX_FILESIZE_KEY = new Bytes(Bytes.toBytes(MAX_FILESIZE)); + public static final String BUSY_REGION_BLOCKED_REQUESTS = "MAX_BLOCKED_REQUESTS"; + private static final Bytes BUSY_REGION_BLOCKED_REQUESTS_KEY = + new Bytes(Bytes.toBytes(BUSY_REGION_BLOCKED_REQUESTS)); + + + public static final String BUSY_REGION_MIN_AGE = "MIN_AGE"; + public static final Bytes BUSY_REGION_MIN_AGE_KEY = + new Bytes(Bytes.toBytes(BUSY_REGION_MIN_AGE)); + public static final String OWNER = "OWNER"; public static final Bytes OWNER_KEY = new Bytes(Bytes.toBytes(OWNER)); @@ -795,6 +804,60 @@ public class HTableDescriptor implements Comparable { } /** + * Get the maximum fraction of write requests that are blocked before a region is considered for + * split while using the {@link org.apache.hadoop.hbase.regionserver.BusyRegionSplitPolicy} + * + * @return Max fraction of blocked write requests + * + * @see #setBlockedRequestsForSplit(float) + */ + public float getBlockedRequestsForSplit() { + byte [] value = getValue(BUSY_REGION_BLOCKED_REQUESTS_KEY); + if (value != null) { + return Float.parseFloat(Bytes.toString(value)); + } + return -1.0f; + } + + /** + * Set the maximum fraction of write requests to be blocked before a region is considered + * for split using the {@link org.apache.hadoop.hbase.regionserver.BusyRegionSplitPolicy}. + * + * @param blockedRequestsForSplit Maximum fraction of blocked write requests. + */ + public HTableDescriptor setBlockedRequestsForSplit(float blockedRequestsForSplit) { + setValue(BUSY_REGION_BLOCKED_REQUESTS_KEY, Float.toString(blockedRequestsForSplit)); + return this; + } + + /** + * Minimum time for a region of the table to be online before it is considered for + * split using the {@link org.apache.hadoop.hbase.regionserver.BusyRegionSplitPolicy} + * + * @return Minimum age in milliseconds + * + * @see #setMinAgeForSplit(long) + */ + public long getMinAgeForSplit() { + byte [] value = getValue(BUSY_REGION_MIN_AGE_KEY); + if (value != null) { + return Long.parseLong(Bytes.toString(value)); + } + return -1; + } + + /** + * Set the minimum time for a region to be online before being considered for split + * using the {@link org.apache.hadoop.hbase.regionserver.BusyRegionSplitPolicy} + * + * @param minAgeForSplit Minimum age in milliseconds + */ + public HTableDescriptor setMinAgeForSplit(long minAgeForSplit) { + setValue(BUSY_REGION_MIN_AGE_KEY, Long.toString(minAgeForSplit)); + return this; + } + + /** * Returns the size of the memstore after which a flush to filesystem is triggered. * * @return memory cache flush size for each hregion, -1 if not set. diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index e19cbf8..5e0951f 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -333,7 +333,7 @@ possible configurations would overwhelm and obscure the important. org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy A split policy determines when a region should be split. The various other split policies that - are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy, + are available currently are BusyRegionSplitPolicy, ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy, DelimitedKeyPrefixRegionSplitPolicy, and KeyPrefixRegionSplitPolicy. DisabledRegionSplitPolicy blocks manual region splitting. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BusyRegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BusyRegionSplitPolicy.java new file mode 100644 index 0000000..6d74827 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BusyRegionSplitPolicy.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BusyRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy { + + private static final Log LOG = LogFactory.getLog(BusyRegionSplitPolicy.class); + + // Maximum fraction blocked write requests before region is considered for split + private float maxBlockedRequests; + public static final float DEFAULT_MAX_BLOCKED_REQUESTS = 0.2f; + + // Minimum age of the region in milliseconds before it is considered for split + private long minAge = -1; + public static final int DEFAULT_MIN_AGE_MS = 3600000; // 1 hour + + // The window time in milliseconds over which the blocked requests rate is calculated + private long aggregationWindow; + + private HRegion region; + private long prevTime; + private long startTime; + private long writeRequestCount; + private long blockedRequestCount; + private float blockedRate; + + @Override + protected void configureForRegion(final HRegion region) { + super.configureForRegion(region); + this.region = region; + Configuration conf = getConf(); + HTableDescriptor desc = region.getTableDesc(); + + if (desc != null) { + maxBlockedRequests = desc.getBlockedRequestsForSplit(); + minAge = desc.getMinAgeForSplit(); + } + + if (maxBlockedRequests <= 0.0f) { + maxBlockedRequests = conf.getFloat("hbase.busy.policy.blockedRequests", + DEFAULT_MAX_BLOCKED_REQUESTS); + } + + if (minAge < 0) { + minAge = conf.getLong("hbase.busy.policy.minAge", DEFAULT_MIN_AGE_MS); + } + + startTime = System.currentTimeMillis(); + prevTime = startTime; + blockedRequestCount = region.getBlockedRequestsCount(); + writeRequestCount = region.getWriteRequestsCount(); + aggregationWindow = 300000; // 5 minutes + validate(); + } + + /** + * Validate the value set for blocked requests threshold for reasonable bounds. + * Reset to default value if it's too low or too high for proper operation. + */ + private void validate() { + if (maxBlockedRequests < 0.00001f || maxBlockedRequests > 0.99999f) { + LOG.warn("Threshold for maximum blocked requests is set too low or too high, " + + " resetting to default of " + DEFAULT_MAX_BLOCKED_REQUESTS); + maxBlockedRequests = DEFAULT_MAX_BLOCKED_REQUESTS; + } + } + + @Override + protected boolean shouldSplit() { + float blockedReqRate = updateRate(); + if (super.shouldSplit()) { + return true; + } + + if ( System.currentTimeMillis() < startTime + minAge) { + return false; + } + + for (Store store: region.getStores()) { + if (!store.canSplit()) { + return false; + } + } + + if (blockedReqRate >= maxBlockedRequests) { + if (LOG.isDebugEnabled()) { + LOG.debug("Going to split region " + region.getRegionInfo().getRegionNameAsString() + + " because it's too busy. Blocked Request rate: " + blockedReqRate); + } + return true; + } + + return false; + } + + /** + * Update the blocked request rate based on number of blocked and total write requests in the + * last aggregation window, or since last call to this method, whichever is farthest in time. + * Uses weighted rate calculation based on previous rate and new data. Once, we + * + * @return Updated blocked request rate. + */ + private float updateRate() { + float aggBlockedRate; + synchronized (this) { + long curTime = System.currentTimeMillis(); + + long newBlockedReqs = region.getBlockedRequestsCount(); + long newWriteReqs = region.getWriteRequestsCount(); + + aggBlockedRate = + (newBlockedReqs - blockedRequestCount) / (newWriteReqs - writeRequestCount + 0.00001f); + + if (curTime - prevTime >= aggregationWindow) { + blockedRate = aggBlockedRate; + prevTime = curTime; + blockedRequestCount = newBlockedReqs; + writeRequestCount = newWriteReqs; + } else if (curTime - startTime >= aggregationWindow) { + // Calculate the aggregate blocked rate as the weighted sum of + // previous window's average blocked rate and blocked rate in this window so far. + float timeSlice = (curTime - prevTime) / (aggregationWindow + 0.0f); + aggBlockedRate = (1 - timeSlice) * blockedRate + timeSlice * aggBlockedRate; + } else { + aggBlockedRate = 0.0f; + } + } + return aggBlockedRate; + } + + + @VisibleForTesting + protected void setAggregationWindow(long aggregationWindow) { + if (aggregationWindow > 0) { + this.aggregationWindow = aggregationWindow; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java index 341a4bf..3a94761 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java @@ -39,6 +39,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import static org.mockito.Matchers.*; @Category({RegionServerTests.class, SmallTests.class}) public class TestRegionSplitPolicy { @@ -148,6 +149,46 @@ public class TestRegionSplitPolicy { assertWithinJitter(maxSplitSize, policy.getSizeToCheck(0)); } + @Test + public void testBusyRegionSplitPolicy() throws Exception { + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + BusyRegionSplitPolicy.class.getName()); + + RegionServerServices rss = Mockito.mock(RegionServerServices.class); + final List regions = new ArrayList(); + Mockito.when(rss.getOnlineRegions(TABLENAME)).thenReturn(regions); + Mockito.when(mockRegion.getRegionServerServices()).thenReturn(rss); + Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(0L); + Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(0L); + + + htd.setMinAgeForSplit(1000000L).setBlockedRequestsForSplit(0.1f); + BusyRegionSplitPolicy policy = + (BusyRegionSplitPolicy)RegionSplitPolicy.create(mockRegion, conf); + + Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(10L); + Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(10L); + // Not enough time since region came online + assertFalse(policy.shouldSplit()); + + + // Reset min age for split to zero + htd.setMinAgeForSplit(0); + policy = + (BusyRegionSplitPolicy)RegionSplitPolicy.create(mockRegion, conf); + policy.setAggregationWindow(1L); + Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(12L); + Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(20L); + Thread.sleep(2); + assertTrue(policy.shouldSplit()); + + Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(21L); + Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(30L); + policy.setAggregationWindow(5000L); + // All requests were blocked but not enough time for aggregation window to complete + assertFalse(policy.shouldSplit()); + } + private void assertWithinJitter(long maxSplitSize, long sizeToCheck) { assertTrue("Size greater than lower bound of jitter", (long)(maxSplitSize * 0.75) <= sizeToCheck); -- 2.8.0-rc2