commit 142c31b67c20a171733ba8593cd23c8aa75b580b Author: Yu Li Date: Mon Sep 14 22:26:41 2015 +0800 Refine RegionGroupingProvider: fix cache issues and make it more scalable diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 0a18fefcbc77c782ccdbba017170810551e7c5fe..1e98e010c0fd850c2542570871a2098f70721ccc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -34,9 +34,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; -import org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.metrics2.MetricsExecutor; @@ -534,10 +533,12 @@ class MetricsRegionServerWrapperImpl } lastRan = currentTime; - numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory) + - BoundedRegionGroupingProvider.getNumLogFiles(regionServer.walFactory); - walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory) + - BoundedRegionGroupingProvider.getLogFileSize(regionServer.walFactory); + WALProvider provider = regionServer.walFactory.getWALProvider(); + WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider(); + numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) + + (metaProvider == null ? 0 : metaProvider.getNumLogFiles()); + walFileSize = (provider == null ? 0 : provider.getLogFileSize()) + + (provider == null ? 0 : provider.getLogFileSize()); //Copy over computed values so that no thread sees half computed values. numStores = tempNumStores; numStoreFiles = tempNumStoreFiles; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..14c5594e6a3d7dc9bd7a85b3ad60654d54f5677d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy; + +/** + * A WAL grouping strategy that limits the number of delegate providers (i.e. wal group) to + * "hbase.wal.regiongrouping.numgroups". + */ +@InterfaceAudience.Private +public class BoundedGroupingStrategy implements RegionGroupingStrategy{ + + static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups"; + static final int DEFAULT_NUM_REGION_GROUPS = 2; + + private ConcurrentHashMap groupNameCache = + new ConcurrentHashMap(); + private AtomicInteger counter = new AtomicInteger(0); + private String[] groupNames; + + @Override + public String group(byte[] identifier) { + String idStr = Bytes.toString(identifier); + String groupName = groupNameCache.get(idStr); + if (null == groupName) { + groupName = groupNames[counter.getAndIncrement() % groupNames.length]; + String extantName = groupNameCache.putIfAbsent(idStr, groupName); + if (extantName != null) { + return extantName; + } + } + return groupName; + } + + @Override + public void init(Configuration config, String providerId) { + int regionGroupNumber = config.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + groupNames = new String[regionGroupNumber]; + for (int i = 0; i < regionGroupNumber; i++) { + groupNames[i] = providerId + GROUP_NAME_DELIMITER + "regiongroup-" + i; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java deleted file mode 100644 index e1417b2e2b0c7bd2144f11cc86d735435a80eb4e..0000000000000000000000000000000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -// imports for classes still in regionserver.wal -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; - -/** - * A WAL Provider that pre-creates N WALProviders and then limits our grouping strategy to them. - * Control the number of delegate providers via "hbase.wal.regiongrouping.numgroups." Control - * the choice of delegate provider implementation and the grouping strategy the same as - * {@link RegionGroupingProvider}. - */ -@InterfaceAudience.Private -public class BoundedRegionGroupingProvider extends RegionGroupingProvider { - private static final Log LOG = LogFactory.getLog(BoundedRegionGroupingProvider.class); - - static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups"; - static final int DEFAULT_NUM_REGION_GROUPS = 2; - private WALProvider[] delegates; - private AtomicInteger counter = new AtomicInteger(0); - - @Override - public void init(final WALFactory factory, final Configuration conf, - final List listeners, final String providerId) throws IOException { - super.init(factory, conf, listeners, providerId); - // no need to check for and close down old providers; our parent class will throw on re-invoke - delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS, - DEFAULT_NUM_REGION_GROUPS))]; - for (int i = 0; i < delegates.length; i++) { - delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, - providerId + i); - } - LOG.info("Configured to run with " + delegates.length + " delegate WAL providers."); - } - - @Override - WALProvider populateCache(final byte[] group) { - final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length]; - final WALProvider extant = cached.putIfAbsent(group, temp); - // if someone else beat us to initializing, just take what they set. - // note that in such a case we skew load away from the provider we picked at first - return extant == null ? temp : extant; - } - - @Override - public void shutdown() throws IOException { - // save the last exception and rethrow - IOException failure = null; - for (WALProvider provider : delegates) { - try { - provider.shutdown(); - } catch (IOException exception) { - LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage()); - LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); - failure = exception; - } - } - if (failure != null) { - throw failure; - } - } - - @Override - public void close() throws IOException { - // save the last exception and rethrow - IOException failure = null; - for (WALProvider provider : delegates) { - try { - provider.close(); - } catch (IOException exception) { - LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage()); - LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); - failure = exception; - } - } - if (failure != null) { - throw failure; - } - } - - /** - * iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or non-meta, - * count the number of files (rolled and active). if either of them isn't, count 0 - * for that provider. - * @param walFactory may not be null. - */ - public static long getNumLogFiles(WALFactory walFactory) { - long result = 0; - if (walFactory.provider instanceof BoundedRegionGroupingProvider) { - BoundedRegionGroupingProvider groupProviders = - (BoundedRegionGroupingProvider)walFactory.provider; - for (int i = 0; i < groupProviders.delegates.length; i++) { - result += - ((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getNumLogFiles(); - } - } - WALProvider meta = walFactory.metaProvider.get(); - if (meta instanceof BoundedRegionGroupingProvider) { - for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++) { - result += ((FSHLog) - ((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log) - .getNumLogFiles(); } - } - return result; - } - - /** - * iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or non-meta, - * count the size of files (rolled and active). if either of them isn't, count 0 - * for that provider. - * @param walFactory may not be null. - */ - public static long getLogFileSize(WALFactory walFactory) { - long result = 0; - if (walFactory.provider instanceof BoundedRegionGroupingProvider) { - BoundedRegionGroupingProvider groupProviders = - (BoundedRegionGroupingProvider)walFactory.provider; - for (int i = 0; i < groupProviders.delegates.length; i++) { - result += - ((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getLogFileSize(); - } - } - WALProvider meta = walFactory.metaProvider.get(); - if (meta instanceof BoundedRegionGroupingProvider) { - for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++) { - result += ((FSHLog) - ((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log) - .getLogFileSize(); - } - } - return result; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index 661016d13e92c237624dc741c438a98bfc1719e7..d399100033ccd89ca9239820d6c29973cc604374 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -129,36 +129,20 @@ public class DefaultWALProvider implements WALProvider { * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, * count the number of files (rolled and active). if either of them aren't, count 0 * for that provider. - * @param walFactory may not be null. */ - public static long getNumLogFiles(WALFactory walFactory) { - long result = 0; - if (walFactory.provider instanceof DefaultWALProvider) { - result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getNumLogFiles(); - } - WALProvider meta = walFactory.metaProvider.get(); - if (meta instanceof DefaultWALProvider) { - result += ((FSHLog)((DefaultWALProvider)meta).log).getNumLogFiles(); - } - return result; + @Override + public long getNumLogFiles() { + return this.log.getNumLogFiles(); } /** * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, * count the size of files (rolled and active). if either of them aren't, count 0 * for that provider. - * @param walFactory may not be null. */ - public static long getLogFileSize(WALFactory walFactory) { - long result = 0; - if (walFactory.provider instanceof DefaultWALProvider) { - result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getLogFileSize(); - } - WALProvider meta = walFactory.metaProvider.get(); - if (meta instanceof DefaultWALProvider) { - result += ((FSHLog)((DefaultWALProvider)meta).log).getLogFileSize(); - } - return result; + @Override + public long getLogFileSize() { + return this.log.getLogFileSize(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 56d17a2d735741662e7335f41d4a6ae3e813c81c..170144808b231eb914875ec96556ca93d3f909e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -218,4 +218,14 @@ class DisabledWALProvider implements WALProvider { return "WAL disabled."; } } + + @Override + public long getNumLogFiles() { + return 0; + } + + @Override + public long getLogFileSize() { + return 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index eb2c426d1c4479b6553f3a568494193480206cae..8395818ba5ad58fab5b239556dabc2a842ab3f98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -32,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.Bytes; /** * A WAL Provider that returns a WAL per group of regions. @@ -57,21 +60,23 @@ class RegionGroupingProvider implements WALProvider { * Map identifiers to a group number. */ public static interface RegionGroupingStrategy { + String GROUP_NAME_DELIMITER = "."; /** * Given an identifier, pick a group. * the byte[] returned for a given group must always use the same instance, since we * will be using it as a hash key. */ - byte[] group(final byte[] identifier); - void init(Configuration config); + String group(final byte[] identifier); + void init(Configuration config, String providerId); } /** * Maps between configuration names for strategies and implementation classes. */ static enum Strategies { - defaultStrategy(IdentityGroupingStrategy.class), - identity(IdentityGroupingStrategy.class); + defaultStrategy(BoundedGroupingStrategy.class), + identity(IdentityGroupingStrategy.class), + bounded(BoundedGroupingStrategy.class); final Class clazz; Strategies(Class clazz) { @@ -97,7 +102,7 @@ class RegionGroupingProvider implements WALProvider { LOG.info("Instantiating RegionGroupingStrategy of type " + clazz); try { final RegionGroupingStrategy result = clazz.newInstance(); - result.init(conf); + result.init(conf, providerId); return result; } catch (InstantiationException exception) { LOG.error("couldn't set up region grouping strategy, check config key " + @@ -112,14 +117,18 @@ class RegionGroupingProvider implements WALProvider { } } - private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy"; - private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name(); + public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy"; + public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name(); static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate"; static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name(); - protected final ConcurrentMap cached = - new ConcurrentHashMap(); + /** A group-provider mapping, recommended to make sure one-one rather than many-one mapping */ + protected final ConcurrentMap cached = + new ConcurrentHashMap(); + /** Stores delegation providers (no duplicated) used by this RegionGroupingProvider */ + private final Set providers = Collections + .synchronizedSet(new HashSet()); protected RegionGroupingStrategy strategy = null; @@ -142,7 +151,7 @@ class RegionGroupingProvider implements WALProvider { /** * Populate the cache for this group. */ - WALProvider populateCache(final byte[] group) throws IOException { + WALProvider populateCache(final String group) throws IOException { final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, providerId + "-" + UUID.randomUUID()); final WALProvider extant = cached.putIfAbsent(group, temp); @@ -151,12 +160,13 @@ class RegionGroupingProvider implements WALProvider { temp.close(); return extant; } + providers.add(temp); return temp; } @Override public WAL getWAL(final byte[] identifier) throws IOException { - final byte[] group = strategy.group(identifier); + final String group = strategy.group(identifier); WALProvider provider = cached.get(group); if (null == provider) { provider = populateCache(group); @@ -168,13 +178,15 @@ class RegionGroupingProvider implements WALProvider { public void shutdown() throws IOException { // save the last exception and rethrow IOException failure = null; - for (WALProvider provider : cached.values()) { - try { - provider.shutdown(); - } catch (IOException exception) { - LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage()); - LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); - failure = exception; + synchronized (providers) { + for (WALProvider provider : providers) { + try { + provider.shutdown(); + } catch (IOException exception) { + LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage()); + LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); + failure = exception; + } } } if (failure != null) { @@ -186,13 +198,15 @@ class RegionGroupingProvider implements WALProvider { public void close() throws IOException { // save the last exception and rethrow IOException failure = null; - for (WALProvider provider : cached.values()) { - try { - provider.close(); - } catch (IOException exception) { - LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage()); - LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); - failure = exception; + synchronized (providers) { + for (WALProvider provider : providers) { + try { + provider.close(); + } catch (IOException exception) { + LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage()); + LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); + failure = exception; + } } } if (failure != null) { @@ -202,11 +216,33 @@ class RegionGroupingProvider implements WALProvider { static class IdentityGroupingStrategy implements RegionGroupingStrategy { @Override - public void init(Configuration config) {} + public void init(Configuration config, String providerId) {} @Override - public byte[] group(final byte[] identifier) { - return identifier; + public String group(final byte[] identifier) { + return Bytes.toString(identifier); } } + @Override + public long getNumLogFiles() { + long numLogFiles = 0; + synchronized (providers) { + for (WALProvider provider : providers) { + numLogFiles += provider.getNumLogFiles(); + } + } + return numLogFiles; + } + + @Override + public long getLogFileSize() { + long logFileSize = 0; + synchronized (providers) { + for (WALProvider provider : providers) { + logFileSize += provider.getLogFileSize(); + } + } + return logFileSize; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index e44a4d1cac12c1015a1441de98451518f04b1f39..c869a5f389f933cb048b7d8ed124c5a26f55f511 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -76,7 +76,7 @@ public class WALFactory { static enum Providers { defaultProvider(DefaultWALProvider.class), filesystem(DefaultWALProvider.class), - multiwal(BoundedRegionGroupingProvider.class); + multiwal(RegionGroupingProvider.class); Class clazz; Providers(Class clazz) { @@ -444,4 +444,12 @@ public class WALFactory { throws IOException { return DefaultWALProvider.createWriter(configuration, fs, path, false); } + + public final WALProvider getWALProvider() { + return this.provider; + } + + public final WALProvider getMetaWALProvider() { + return this.metaProvider.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index b27abf910479b47cb6d9bdbc7936631e4a3acc24..b4c406767774cda913da2c421759f3ca363e88bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -80,4 +80,14 @@ public interface WALProvider { long getLength() throws IOException; } + /** + * Get number of the log files this provider is managing + */ + long getNumLogFiles(); + + /** + * Get size of the log files this provider is managing + */ + long getLogFileSize(); + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d2581a13878aed027a34696064234dafc022a363..e06a587733d6983e2d6acbb4a5ba70fba4feef58 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -230,4 +230,14 @@ public class IOTestProvider implements WALProvider { } } } + + @Override + public long getNumLogFiles() { + return this.log.getNumLogFiles(); + } + + @Override + public long getLogFileSize() { + return this.log.getLogFileSize(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java deleted file mode 100644 index c54e794e8cc25a32e875355cb188dd2eafecd815..0000000000000000000000000000000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.NUM_REGION_GROUPS; -import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.DEFAULT_NUM_REGION_GROUPS; -import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -@Category(LargeTests.class) -public class TestBoundedRegionGroupingProvider { - protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingProvider.class); - - @Rule - public TestName currentTest = new TestName(); - protected static Configuration conf; - protected static FileSystem fs; - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @Before - public void setUp() throws Exception { - FileStatus[] entries = fs.listStatus(new Path("/")); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } - } - - @After - public void tearDown() throws Exception { - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf = TEST_UTIL.getConfiguration(); - // Make block sizes small. - conf.setInt("dfs.blocksize", 1024 * 1024); - // quicker heartbeat interval for faster DN death notification - conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); - conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt("dfs.client.socket-timeout", 5000); - - // faster failover with cluster.shutdown();fs.close() idiom - conf.setInt("hbase.ipc.client.connect.max.retries", 1); - conf.setInt("dfs.client.block.recovery.retries", 1); - conf.setInt("hbase.ipc.client.connection.maxidletime", 500); - - conf.setClass(WAL_PROVIDER, BoundedRegionGroupingProvider.class, WALProvider.class); - - TEST_UTIL.startMiniDFSCluster(3); - - fs = TEST_UTIL.getDFSCluster().getFileSystem(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Write to a log file with three concurrent threads and verifying all data is written. - */ - @Test - public void testConcurrentWrites() throws Exception { - // Run the WPE tool with three threads writing 3000 edits each concurrently. - // When done, verify that all edits were written. - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); - assertEquals(0, errCode); - } - - /** - * Make sure we can successfully run with more regions then our bound. - */ - @Test - public void testMoreRegionsThanBound() throws Exception { - final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2); - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", - "-regions", parallelism}); - assertEquals(0, errCode); - } - - @Test - public void testBoundsGreaterThanDefault() throws Exception { - final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); - try { - conf.setInt(NUM_REGION_GROUPS, temp*4); - final String parallelism = Integer.toString(temp*4); - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", - "-regions", parallelism}); - assertEquals(0, errCode); - } finally { - conf.setInt(NUM_REGION_GROUPS, temp); - } - } - - @Test - public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception { - final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); - try { - conf.setInt(NUM_REGION_GROUPS, temp*4); - final String parallelism = Integer.toString(temp*4*2); - int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), - new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", - "-regions", parallelism}); - assertEquals(0, errCode); - } finally { - conf.setInt(NUM_REGION_GROUPS, temp); - } - } - - /** - * Ensure that we can use Set.add to deduplicate WALs - */ - @Test - public void setMembershipDedups() throws IOException { - final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); - WALFactory wals = null; - try { - conf.setInt(NUM_REGION_GROUPS, temp*4); - // Set HDFS root directory for storing WAL - FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS()); - - wals = new WALFactory(conf, null, currentTest.getMethodName()); - final Set seen = new HashSet(temp*4); - final Random random = new Random(); - int count = 0; - // we know that this should see one of the wals more than once - for (int i = 0; i < temp*8; i++) { - final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt())); - LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL); - if (seen.add(maybeNewWAL)) { - count++; - } - } - assertEquals("received back a different number of WALs that are not equal() to each other " + - "than the bound we placed.", temp*4, count); - } finally { - if (wals != null) { - wals.close(); - } - conf.setInt(NUM_REGION_GROUPS, temp); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..e6106b98e1a04bc768584b741ccbeca148c38f58 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java @@ -0,0 +1,189 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS; +import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS; +import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; +import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(LargeTests.class) +public class TestBoundedRegionGroupingStrategy { + protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class); + + @Rule + public TestName currentTest = new TestName(); + protected static Configuration conf; + protected static FileSystem fs; + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Before + public void setUp() throws Exception { + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + } + + @After + public void tearDown() throws Exception { + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + // Make block sizes small. + conf.setInt("dfs.blocksize", 1024 * 1024); + // quicker heartbeat interval for faster DN death notification + conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); + conf.setInt("dfs.heartbeat.interval", 1); + conf.setInt("dfs.client.socket-timeout", 5000); + + // faster failover with cluster.shutdown();fs.close() idiom + conf.setInt("hbase.ipc.client.connect.max.retries", 1); + conf.setInt("dfs.client.block.recovery.retries", 1); + conf.setInt("hbase.ipc.client.connection.maxidletime", 500); + + conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class); + conf.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name()); + + TEST_UTIL.startMiniDFSCluster(3); + + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Write to a log file with three concurrent threads and verifying all data is written. + */ + @Test + public void testConcurrentWrites() throws Exception { + // Run the WPE tool with three threads writing 3000 edits each concurrently. + // When done, verify that all edits were written. + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); + assertEquals(0, errCode); + } + + /** + * Make sure we can successfully run with more regions then our bound. + */ + @Test + public void testMoreRegionsThanBound() throws Exception { + final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism}); + assertEquals(0, errCode); + } + + @Test + public void testBoundsGreaterThanDefault() throws Exception { + final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + try { + conf.setInt(NUM_REGION_GROUPS, temp*4); + final String parallelism = Integer.toString(temp*4); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism}); + assertEquals(0, errCode); + } finally { + conf.setInt(NUM_REGION_GROUPS, temp); + } + } + + @Test + public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception { + final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + try { + conf.setInt(NUM_REGION_GROUPS, temp*4); + final String parallelism = Integer.toString(temp*4*2); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism}); + assertEquals(0, errCode); + } finally { + conf.setInt(NUM_REGION_GROUPS, temp); + } + } + + /** + * Ensure that we can use Set.add to deduplicate WALs + */ + @Test + public void setMembershipDedups() throws IOException { + final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + WALFactory wals = null; + try { + conf.setInt(NUM_REGION_GROUPS, temp*4); + // Set HDFS root directory for storing WAL + FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS()); + + wals = new WALFactory(conf, null, currentTest.getMethodName()); + final Set seen = new HashSet(temp*4); + final Random random = new Random(); + int count = 0; + // we know that this should see one of the wals more than once + for (int i = 0; i < temp*8; i++) { + final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt())); + LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL); + if (seen.add(maybeNewWAL)) { + count++; + } + } + assertEquals("received back a different number of WALs that are not equal() to each other " + + "than the bound we placed.", temp*4, count); + } finally { + if (wals != null) { + wals.close(); + } + conf.setInt(NUM_REGION_GROUPS, temp); + } + } +}