From 3469e318c4ec0481647d333476367a1d3835fe15 Mon Sep 17 00:00:00 2001 From: abhilak Date: Wed, 22 Jul 2015 23:04:03 -0700 Subject: [PATCH] Improve RegionSplitter --- .../apache/hadoop/hbase/util/RegionSplitter.java | 393 ++++++++++++++++++++- 1 file changed, 383 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index ea704f8..f8bdc01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.io.InterruptedIOException; import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; @@ -26,6 +27,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -51,6 +53,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -85,7 +89,7 @@ import com.google.common.collect.Sets; * Answer: Automatic splitting is determined by the configuration value * HConstants.HREGION_MAX_FILESIZE. It is not recommended that you set this * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting - * is 100GB, which would result in > 1hr major compactions if reached. + * is 100GB, which would result in > 1hr major compactions if reached. *

* Question: Why did the original authors decide to manually split?
* Answer: Specific workload characteristics of our use case allowed us @@ -227,7 +231,7 @@ public class RegionSplitter { /** * @param row * byte array representing a row in HBase - * @return String to use for debug & file printing + * @return String to use for debug & file printing */ String rowToStr(byte[] row); @@ -254,18 +258,34 @@ public class RegionSplitter { *

*

* * There are two SplitAlgorithms built into RegionSplitter, HexStringSplit @@ -313,6 +333,20 @@ public class RegionSplitter { opt.addOption(null, "risky", false, "Skip verification steps to complete quickly." + "STRONGLY DISCOURAGED for production systems. "); + opt.addOption(null, "n", false, + "Peform rolling split on all regions with default algorithm " + + "till there are atleast a given number of regions or " + + "all regions are of size less than a given value(in MB)."); + opt.addOption(null, "requiredNumRegions", true, + "Required number of regions after split. " + + "Required with -n if requiredRegionSize is not provided."); + opt.addOption(null, "requiredRegionSize", true, + "Only the regions above this size(in MB) will be spilt. " + + "Required with -n if requiredNumRegions is not provided."); + opt.addOption(null, "maxSplitWaitTime", true, + "Maximum time in ms the program will wait for a single split to complete."); + opt.addOption(null, "maxCompactionWaitTime", true, + "Maximum time in ms the program will wait for compaction(after splitting) to complete."); CommandLine cmd = new GnuParser().parse(opt, args); if (cmd.hasOption("D")) { @@ -335,7 +369,8 @@ public class RegionSplitter { boolean rollingSplit = cmd.hasOption("r"); boolean oneOperOnly = createTable ^ rollingSplit; - if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) { + if ((2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) && + !cmd.hasOption("n")) { new HelpFormatter().printHelp("RegionSplitter \n"+ "SPLITALGORITHM is a java class name of a class implementing " + "SplitAlgorithm, or one of the special strings HexStringSplit " + @@ -345,9 +380,35 @@ public class RegionSplitter { return; } TableName tableName = TableName.valueOf(cmd.getArgs()[0]); + if(rollingSplit && cmd.hasOption("n")) { + // At least one of stopping condition should be provided while using + // recursive normal splitting. Splitting stops whenever one of the conditions is met, + // i.e either we have given number of regions or all regions sizes are less than + // the provided value. + if(!cmd.hasOption("requiredNumRegions") && !cmd.hasOption("requiredRegionSize")) { + new HelpFormatter().printHelp("RegionSplitter
\n"+ + "To run normal splitting without using any split algorithm " + + "provide atleast one among requiredNumRegions and requiredRegionSize.", opt); + return; + } + if (cmd.hasOption("requiredNumRegions")) { + conf.set("split.regions.max", cmd.getOptionValue("requiredNumRegions")); + } + if (cmd.hasOption("requiredRegionSize")) { + conf.set("split.region.size.max", cmd.getOptionValue("requiredRegionSize")); + } + if (cmd.hasOption("maxSplitWaitTime")) { + conf.set("split.wait.max", cmd.getOptionValue("maxSplitWaitTime")); + } + if (cmd.hasOption("maxCompactionWaitTime")) { + conf.set("split.compact.wait.max", cmd.getOptionValue("maxCompactionWaitTime")); + } + recursiveSplit(tableName, conf); + return; + } String splitClass = cmd.getArgs()[1]; - SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass); - + SplitAlgorithm splitAlgo = null; + splitAlgo = newSplitAlgoInstance(conf, splitClass); if (cmd.hasOption("firstrow")) { splitAlgo.setFirstRow(cmd.getOptionValue("firstrow")); } @@ -368,6 +429,318 @@ public class RegionSplitter { } } + static void recursiveSplit(TableName tableName, Configuration conf) + throws IOException, InterruptedException { + // Max number of regions and max region size are set to very large and small values + // respectively by default to make the provided parameter as + // stopping condition(at least one of them must have been provided). + long maxRegions = conf.getLong("split.regions.max",1000000000); + long maxSize = conf.getLong("split.region.size.max", 100); + long startTime = System.currentTimeMillis(); + int totalNumSplits = 0; + boolean splittingNotComplete = true; + while (splittingNotComplete) + { + // Will attempt to split all current regions. + int numSplits = splitAllRegions(tableName,conf,maxRegions,maxSize); + splittingNotComplete = numSplits > 0; + totalNumSplits += numSplits; + } + long timeDiff = System.currentTimeMillis() - startTime; + LOG.info("TOTAL TIME = " + + org.apache.hadoop.util.StringUtils.formatTime(timeDiff)); + LOG.info("Total number of splits performed = " + totalNumSplits); + if (0 < totalNumSplits) { + LOG.info("Avg Time / Split = " + + org.apache.hadoop.util.StringUtils.formatTime(timeDiff / totalNumSplits)); + } + } + + static int splitAllRegions(TableName tableName, Configuration conf, + long maxRegions, long maxSize) + throws IOException, InterruptedException { + + int splitCount = 0; + // Maximum amount of time in ms we wait for spilt / compaction. + final long maxSplitWait = conf.getLong("split.wait.max", 5*60*1000); + final long maxCompactionWait = conf.getLong("split.compact.wait.max", 60*60*1000); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + ((ClusterConnection)connection).clearRegionCache(); + Path hbDir = FSUtils.getRootDir(conf); + Path tableDir = FSUtils.getTableDir(hbDir, tableName); + FileSystem fs = FileSystem.get(conf); + HTableDescriptor htd = null; + try (Table table = connection.getTable(tableName)) { + htd = table.getTableDescriptor(); + } + + // Storing sizes of all current regions. Will only split the regions + // above the given threshold. + final TreeMap regionSizes = Maps.newTreeMap(); + Admin admin = connection.getAdmin(); + ClusterStatus clusterStatus = admin.getClusterStatus(); + for (ServerName serverName : clusterStatus.getServers()) { + ServerLoad serverLoad = clusterStatus.getLoad(serverName); + for (Map.Entry entry : serverLoad.getRegionsLoad().entrySet()) { + String region = Bytes.toString(entry.getKey()); + RegionLoad regionLoad = entry.getValue(); + long storeFileSize = regionLoad.getStorefileSizeMB(); + regionSizes.put(region, storeFileSize); + // use other load characteristics for improvement + } + } + + // Query meta for all regions in the table + Set> rows = Sets.newHashSet(); + Pair tmp = null; + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + tmp = regionLocator.getStartEndKeys(); + } + + // Set first row and last row when they are empty + final byte xFF = (byte) 0xFF; + final byte x00 = (byte) 0x00; + byte[] firstRowBytes = + new byte[] {x00, x00, x00, x00, x00, x00, x00, x00}; + byte[] lastRowBytes = + new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF}; + Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length, + "Start and End rows should be equivalent"); + for (int i = 0; i < tmp.getFirst().length; ++i) { + byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i]; + if (start == null || end == null) { + } + if (start.length == 0) { + start = firstRowBytes; + } + else if(end.length == 0) { + end = lastRowBytes; + } + if(Bytes.toStringBinary(start).equals(Bytes.toStringBinary(end))) + { + continue; + } + rows.add(Pair.newPair(start, end)); + } + + final int origCount = rows.size(); + if (origCount >= maxRegions) { + // We already have desired number of regions. No splitting required. + return 0; + } + + // Saving initial region name and end key for each start key + final TreeMap regionNameBySk = Maps.newTreeMap(); + final TreeMap endKeyBySk = Maps.newTreeMap(); + try (RegionLocator regionLocatorSk = connection.getRegionLocator(tableName)) { + for (Pair allRows : rows) { + regionNameBySk.put(Bytes.toString(allRows.getFirst()), + Bytes.toString(regionLocatorSk + .getRegionLocation(allRows.getFirst(),true).getRegionInfo().getRegionName() + )); + endKeyBySk.put(Bytes.toString(allRows.getFirst()),allRows.getSecond()); + } + } + + // Sort regions according to size. Will attempt to split region in descending order of size. + List> regionsSkEk = Lists.newArrayList(rows); + Collections.sort(regionsSkEk, new Comparator>() { + public int compare(Pair region1, Pair region2) { + Long regionSize1 = regionSizes + .get(regionNameBySk.get(Bytes.toString(region1.getFirst()))); + Long regionSize2 = regionSizes + .get(regionNameBySk.get(Bytes.toString(region2.getFirst()))); + return regionSize2.compareTo(regionSize1); + } + }); + + LOG.info("Table " + tableName + " has " + rows.size() + + " regions which will be attempted to split in current round."); + + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + LOG.debug("Starting splits."); + long startTime = System.currentTimeMillis(); + for(Pair regions : regionsSkEk) { + HRegionLocation regionLoc = regionLocator.getRegionLocation(regions.getFirst(),true); + byte[] regionName = regionLoc.getRegionInfo().getRegionName(); + + byte[] currentEkForRegion = regionLoc.getRegionInfo().getEndKey(); + if (currentEkForRegion.length == 0) { + currentEkForRegion = lastRowBytes; + } + + if (regionSizes.get(regionNameBySk.get(Bytes.toString(regions.getFirst()))) < maxSize) { + // Region is already very small in size. No splitting required. + break; + } else if (!Bytes.equals( + // Region has split already. So do not attempting to split it in this round. + currentEkForRegion , endKeyBySk.get(Bytes.toString(regions.getFirst())))) { + continue; + } else if( !Bytes.toString(regionName) + .equals(regionNameBySk.get(Bytes.toString(regions.first)))) { + // Saved region data has changed. Skip splitting for this round + continue; + } + + LOG.debug("Splitting the region " + regionName + " ."); + + try (Admin SplitAdmin = connection.getAdmin()) { + SplitAdmin.splitRegion(regionName); + // split the region + } + + // wait for the splits to come online + boolean areRegionsOnlineAfterSplitting = false; + long splitStartTime = System.currentTimeMillis(); + RetryCounter splitCounter = + new RetryCounterFactory(1000000000, 10*1000, 5*60*1000).create(); + long currentTime; + // Wait for the daughter regions to come online. + while(!areRegionsOnlineAfterSplitting) { + LOG.debug("Waiting for the split to complete."); + currentTime = System.currentTimeMillis(); + if (currentTime - splitStartTime > maxSplitWait) { + // Waited a long time for split. Will not wait further. + LOG.debug("Waited too long for spilt to complete. Skip waiting for split."); + break; + } + ((ClusterConnection)connection).clearRegionCache(); + try { + HRegionInfo skRegionInfo = + regionLocator.getRegionLocation(regions.getFirst(),true).getRegionInfo(); + currentEkForRegion = skRegionInfo.getEndKey(); + if (currentEkForRegion.length == 0) { + currentEkForRegion = lastRowBytes; + } + if (skRegionInfo.isOffline() || Bytes.equals(currentEkForRegion, + endKeyBySk.get(Bytes.toString(regions.getFirst())))) { + // Wait for splits to appear online. Region has split successfully + // when initial end key is no longer the current end key. + LOG.debug("Daugters not online yet. " + skRegionInfo.isOffline() + " " + + Bytes.equals(currentEkForRegion, + endKeyBySk.get(Bytes.toString(regions.getFirst())))); + try { + splitCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + continue; + } + } catch (NoServerForRegionException nsfre) { + // NSFRE will occur if the old hbase:meta entry has no server assigned + LOG.debug(nsfre); + try { + splitCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + continue; + } + LOG.debug("Split done."); + areRegionsOnlineAfterSplitting = true; + } + splitCount++; + + // Forced compaction(in some cases compaction might not be triggered on its own) + try (Admin CompactAdmin = connection.getAdmin()) { + LOG.debug("Starting compaction"); + // If compaction has been already queued. Hbase will ignore these calls. + try { + byte [] daughter1 = regionLocator. + getRegionLocation(regions.getFirst(),true).getRegionInfo().getRegionName(); + currentEkForRegion = regionLocator. + getRegionLocation(regions.getFirst(),true).getRegionInfo().getEndKey(); + // End key for first daughter region will be start key for other daughter region. + byte [] daughter2 = regionLocator. + getRegionLocation(currentEkForRegion,true).getRegionInfo().getRegionName(); + CompactAdmin.compactRegion(daughter1); + CompactAdmin.compactRegion(daughter2); + } catch (NoServerForRegionException nsfre) { + // NSFRE will occur if the old hbase:meta entry has no server assigned + LOG.debug(nsfre); + } + } + + // wait for the compaction to complete, wait till all reference files are gone + boolean waitForCompaction = true; + long compactionStartTime = System.currentTimeMillis(); + RetryCounter compactCounter = + new RetryCounterFactory(1000000000, 10*1000, 15*60*1000).create(); + while(waitForCompaction) { + LOG.debug("Waiting for compaction to complete."); + currentTime = System.currentTimeMillis(); + if (currentTime - compactionStartTime > maxCompactionWait) { + // Waited a long time for compaction. Will not wait further. + LOG.debug("Waited too long for compaction to complete." + + " Skip waiting for compaction."); + break; + } + ((ClusterConnection)connection).clearRegionCache(); + try { + // when a daughter region is opened, a compaction is triggered + // wait until compaction completes for both daughter regions + LinkedList check = Lists.newLinkedList(); + check.add(regionLocator.getRegionLocation(regions.getFirst(),true).getRegionInfo()); + // After split, search for region for current end key will + // give the other daughter region. + regionLoc = regionLocator.getRegionLocation(regions.getFirst(),true); + currentEkForRegion = regionLoc.getRegionInfo().getEndKey(); + check.add(regionLocator.getRegionLocation(currentEkForRegion,true).getRegionInfo()); + + for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) { + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + connection.getConfiguration(), fs, tableDir, hri, true); + // Check every Column Family for that region -- check does not have references. + boolean refFound = false; + for (HColumnDescriptor c : htd.getFamilies()) { + if ((refFound = regionFs.hasReferences(c.getNameAsString()))) { + break; + } + } + // compaction is completed when all reference files are gone + if (!refFound) { + check.remove(hri); + } + } + if (check.isEmpty()) { + waitForCompaction = false; + } + } catch (NoServerForRegionException nsfre) { + LOG.debug("No Server Exception thrown for: " + + Bytes.toStringBinary(regions.getFirst())); + // wait for some more time for region servers to come online + } + // compaction not done yet + if(waitForCompaction) { + try { + compactCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + } + + if (splitCount % 10 == 0) { + LOG.info(splitCount + "splits done in current round."); + } + if (splitCount + origCount >= maxRegions) { + // Already split to desired number of region. No further splitting required. + break; + } + } + long timeDiff = System.currentTimeMillis() - startTime; + LOG.info("Time taken in current round = " + + org.apache.hadoop.util.StringUtils.formatTime(timeDiff)); + LOG.info("Splits = " + splitCount); + if (0 < splitCount) { + LOG.info("Avg Time / Split = " + + org.apache.hadoop.util.StringUtils.formatTime(timeDiff / splitCount)); + } + } + } + return splitCount; + } + static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo, String[] columnFamilies, Configuration conf) throws IOException, InterruptedException { @@ -878,10 +1251,10 @@ public class RegionSplitter { * boundaries. The format of a HexStringSplit region boundary is the ASCII * representation of an MD5 checksum, or any other uniformly distributed * hexadecimal value. Row are hex-encoded long values in the range - * "00000000" => "FFFFFFFF" and are left-padded with zeros to keep the + * "00000000" => "FFFFFFFF" and are left-padded with zeros to keep the * same order lexicographically as if they were binary. * - * Since this split algorithm uses hex strings as keys, it is easy to read & + * Since this split algorithm uses hex strings as keys, it is easy to read & * write in the shell but takes up more space and may be non-intuitive. */ public static class HexStringSplit implements SplitAlgorithm { @@ -1032,7 +1405,7 @@ public class RegionSplitter { /** * A SplitAlgorithm that divides the space of possible keys evenly. Useful * when the keys are approximately uniform random bytes (e.g. hashes). Rows - * are raw byte values in the range 00 => FF and are right-padded with + * are raw byte values in the range 00 => FF and are right-padded with * zeros to keep the same memcmp() order. This is the natural algorithm to use * for a byte[] environment and saves space, but is not necessarily the * easiest for readability. -- 1.9.5