diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a3956d7..113b3c3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.balancer.BalancerChore; +import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; @@ -281,6 +283,8 @@ Server { private LoadBalancer balancer; private Thread balancerChore; + private Thread clusterStatusChore; + // If 'true', the balancer is 'on'. If 'false', the balancer will not run. private volatile boolean balanceSwitch = true; @@ -308,6 +312,7 @@ Server { private final boolean masterCheckCompression; private SpanReceiverHost spanReceiverHost; + /** * Initializes the HMaster. The steps are as follows: *

@@ -697,6 +702,7 @@ Server { // Start balancer and meta catalog janitor after meta and regions have // been assigned. status.setStatus("Starting balancer and catalog janitor"); + this.clusterStatusChore = getAndStartClusterStatusChore(this); this.balancerChore = getAndStartBalancerChore(this); this.catalogJanitorChore = new CatalogJanitor(this, this); startCatalogJanitorChore(); @@ -1080,17 +1086,17 @@ Server { if (this.executorService != null) this.executorService.shutdown(); } + private static Thread getAndStartClusterStatusChore(HMaster master) { + if (master == null || master.balancer == null) { + return null; + } + Chore chore = new ClusterStatusChore(master, master.balancer); + return Threads.setDaemonThreadRunning(chore.getThread()); + } + private static Thread getAndStartBalancerChore(final HMaster master) { - String name = master.getServerName() + "-BalancerChore"; - int balancerPeriod = - master.getConfiguration().getInt("hbase.balancer.period", 300000); // Start up the load balancer chore - Chore chore = new Chore(name, balancerPeriod, master) { - @Override - protected void chore() { - master.balance(); - } - }; + Chore chore = new BalancerChore(master); return Threads.setDaemonThreadRunning(chore.getThread()); } @@ -1098,6 +1104,9 @@ Server { if (this.balancerChore != null) { this.balancerChore.interrupt(); } + if (this.clusterStatusChore != null) { + this.clusterStatusChore.interrupt(); + } if (this.catalogJanitorChore != null) { this.catalogJanitorChore.interrupt(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java new file mode 100644 index 0000000..b662af5 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.master.HMaster; + +/** + * Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when + * needed. + */ +@InterfaceAudience.Private +public class BalancerChore extends Chore { + + private final HMaster master; + + public BalancerChore(HMaster master) { + super(master.getServerName() + "-BalancerChore", + master.getConfiguration().getInt("hbase.balancer.period", 300000), + master); + this.master = master; + } + + @Override + protected void chore() { + master.balance(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java new file mode 100644 index 0000000..23f7843 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.LoadBalancer; + +/** + * Chore that will feed the balancer the cluster status. + */ +@InterfaceAudience.Private +public class ClusterStatusChore extends Chore { + + private final HMaster master; + private final LoadBalancer balancer; + + public ClusterStatusChore(HMaster master, LoadBalancer balancer) { + super(master.getServerName() + "-ClusterStatusChore", + master.getConfiguration().getInt("hbase.balancer.statusPeriod", 60000), + master); + this.master = master; + this.balancer = balancer; + } + + @Override + protected void chore() { + balancer.setClusterStatus(master.getClusterStatus()); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index af5ddf2..3cf93fe 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -104,17 +104,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "hbase.master.balancer.stochastic.stepsPerRegion"; private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions"; + private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); private final RegionLocationFinder regionFinder = new RegionLocationFinder(); private ClusterStatus clusterStatus = null; - private Map loads = new HashMap(); + private Map> loads = new HashMap>(); // values are defaults private int maxSteps = 15000; private int stepsPerRegion = 110; private int maxMoves = 600; + private int numRegionLoadsToRemember = 15; private float loadMultiplier = 55; private float moveCostMultiplier = 5; private float tableMultiplier = 5; @@ -124,6 +126,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private float memStoreSizeMultiplier = 5; private float storeFileSizeMultiplier = 5; + @Override public void setConf(Configuration conf) { super.setConf(conf); @@ -133,6 +136,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves); stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); + numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); + // Load multiplier should be the greatest as it is the most general way to balance data. loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier); @@ -146,9 +151,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier); memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier); storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier); - - // These are not used currently. - // TODO: Start using these once rolling averages are implemented for read/write load. readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier); writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier); } @@ -313,13 +315,33 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } /** Store the current region loads. */ - private void updateRegionLoad() { - loads.clear(); + private synchronized void updateRegionLoad() { + + //We create a new hashmap so that regions that are no longer there are removed. + //However we temporarily need the old loads so we can use them to keep the rolling average. + Map> oldLoads = loads; + loads = new HashMap>(); + for (ServerName sn : clusterStatus.getServers()) { ServerLoad sl = clusterStatus.getLoad(sn); if (sl == null) continue; for (Entry entry : sl.getRegionsLoad().entrySet()) { - loads.put(Bytes.toString(entry.getKey()), entry.getValue()); + List rLoads = oldLoads.get(Bytes.toString(entry.getKey())); + if (rLoads != null) { + + //We're only going to keep 15. So if there are that many already take the last 14 + if (rLoads.size() >= numRegionLoadsToRemember) { + int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember); + + rLoads = rLoads.subList(numToRemove, rLoads.size()); + } + + } else { + //There was nothing there + rLoads = new ArrayList(); + } + rLoads.add(entry.getValue()); + loads.put(Bytes.toString(entry.getKey()), rLoads); } } @@ -402,17 +424,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { double localityCost = localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState); - // TODO: Add Read and Write requests back in here after keeping a running average on per - // region load metrics. double memstoreSizeCost = memStoreSizeMultiplier * computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE); double storefileSizeCost = storeFileSizeMultiplier * computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE); - double total = + + + double readRequestCost = + readRequestMultiplier + * computeRegionLoadCost(clusterState, RegionLoadCostType.READ_REQUEST); + double writeRequestCost = + writeRequestMultiplier + * computeRegionLoadCost(clusterState, RegionLoadCostType.WRITE_REQUEST); + + double total = moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost - + storefileSizeCost; + + storefileSizeCost + readRequestCost + writeRequestCost; LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = " + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = " + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = " @@ -611,7 +640,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { // For each region for (HRegionInfo region : regions) { // Try and get the region using the regionNameAsString - RegionLoad rl = loads.get(region.getRegionNameAsString()); + List rl = loads.get(region.getRegionNameAsString()); // That could have failed if the RegionLoad is using the other regionName if (rl == null) { @@ -635,24 +664,34 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { /** * Get the un-scaled cost from a RegionLoad * - * @param rl the Region load + * @param regionLoadList the Region load List * @param type The type of cost to extract * @return the double representing the cost */ - private double getRegionLoadCost(RegionLoad rl, RegionLoadCostType type) { - switch (type) { - case READ_REQUEST: - return rl.getReadRequestsCount(); - case WRITE_REQUEST: - return rl.getWriteRequestsCount(); - case MEMSTORE_SIZE: - return rl.getMemStoreSizeMB(); - case STOREFILE_SIZE: - return rl.getStorefileSizeMB(); - default: - assert false : "RegionLoad cost type not supported."; - return 0; + private double getRegionLoadCost(List regionLoadList, RegionLoadCostType type) { + double cost = 0; + for(RegionLoad rl:regionLoadList) { + switch (type) { + case READ_REQUEST: + cost += rl.getReadRequestsCount(); + break; + case WRITE_REQUEST: + cost += rl.getWriteRequestsCount(); + break; + case MEMSTORE_SIZE: + cost += rl.getMemStoreSizeMB(); + break; + case STOREFILE_SIZE: + cost += rl.getStorefileSizeMB(); + break; + default: + assert false : "RegionLoad cost type not supported."; + return 0; + } } + + return cost; + } /**