Index: conf/hadoop-default.xml =================================================================== --- conf/hadoop-default.xml (revision 599531) +++ conf/hadoop-default.xml (working copy) @@ -399,6 +399,16 @@ + dfs.balance.bandwidthPerSec + 1048576 + + Specifies the maximum amount of bandwidth that each datanode + can utilize for the balancing purpose in term of + the number of bytes per second. + + + + dfs.hosts Names a file that contains a list of hosts that are Index: src/test/org/apache/hadoop/dfs/TestBalancer.java =================================================================== --- src/test/org/apache/hadoop/dfs/TestBalancer.java (revision 0) +++ src/test/org/apache/hadoop/dfs/TestBalancer.java (revision 0) @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.FSConstants.DatanodeReportType; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import junit.framework.TestCase; +/** + * This class tests if a balancer schedules tasks correctly. + */ +public class TestBalancer extends TestCase { + private static final Configuration CONF = new Configuration(); + final private static long CAPACITY = 500L; + final private static String RACK0 = "/rack0"; + final private static String RACK1 = "/rack1"; + final private static String RACK2 = "/rack2"; + final static private String fileName = "/tmp.txt"; + final static private Path filePath = new Path(fileName); + private MiniDFSCluster cluster; + + ClientProtocol client; + + static final int DEFAULT_BLOCK_SIZE = 10; + private Balancer balancer; + private Random r = new Random(); + + static { + CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE); + //CONF.setLong("dfs.blockreport.intantervalMsec", 100L); + CONF.setLong("dfs.heartbeat.interval", 1L); + CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); + + } + + /* create a file with a length of fileLen */ + private void createFile(long fileLen, short replicationFactor) + throws IOException { + FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, filePath, fileLen, + replicationFactor, r.nextLong()); + DFSTestUtil.waitReplication(fs, filePath, replicationFactor); + } + + + /* fill up a cluster with numNodes datanodes + * whose used space to be size + */ + private Block[] generateBlocks(long size, short numNodes) throws IOException { + cluster = new MiniDFSCluster( CONF, numNodes, true, null); + try { + cluster.waitActive(); + client = DFSClient.createNamenode( + DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF); + + short replicationFactor = (short)(numNodes-1); + long fileLen = size/replicationFactor; + createFile(fileLen, replicationFactor); + + List locatedBlocks = cluster.getNameNode(). + getBlockLocations(fileName, 0, fileLen).getLocatedBlocks(); + + int numOfBlocks = locatedBlocks.size(); + Block[] blocks = new Block[numOfBlocks]; + for(int i=0; i> blockReports = + new ArrayList>(usedSpace.length); + Block[][] results = new Block[usedSpace.length][]; + for(int i=0; i()); + } + for(int i=0; i0 ) { + notChosen = false; + blockReports.get(chosenIndex).add(blocks[i]); + usedSpace[chosenIndex] -= blocks[i].getNumBytes(); + } + } + } + } + for(int i=0; i nodeBlockList = blockReports.get(i); + results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]); + } + return results; + } + + /* we first start a cluster and fill the cluster up to a certain size. + * then redistribute blocks according the requried distribution. + * Afterwards a balnacer is running to balance the cluster. + */ + private void testUnevenDistribution( + long distribution[], long capacities[], String[] racks) throws Exception { + int numDatanodes = distribution.length; + if (capacities.length != numDatanodes || racks.length != numDatanodes) { + throw new IllegalArgumentException("Array length is not the same"); + } + + // calculate total space that need to be filled + long totalUsedSpace=0L; + for(int i=0; i10) { + balanced = false; + try { + Thread.sleep(100); + } catch(InterruptedException ignored) { + } + break; + } + } + } while(!balanced); + + } + /** Test a cluster with even distribution, + * then a new empty node is added to the cluster*/ + public void testBalancer0() throws Exception { + /** one-node cluste test*/ + // add an empty node with half of the CAPACITY & the same rack + test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0); + + /** two-node cluster test */ + test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2); + } + + /** Test unEven distributed cluster */ + public void testBalancer1() throws Exception { + testUnevenDistribution( + new long[] {50*CAPACITY/100, 10*CAPACITY/100}, + new long[]{CAPACITY, CAPACITY}, + new String[] {RACK0, RACK1}); + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + TestBalancer balancerTest = new TestBalancer(); + balancerTest.testBalancer0(); + balancerTest.testBalancer1(); + } +} Index: src/test/org/apache/hadoop/dfs/MiniDFSCluster.java =================================================================== --- src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (revision 599531) +++ src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (working copy) @@ -234,6 +234,13 @@ + "] is less than the number of datanodes [" + numDataNodes + "]."); } + if (simulatedCapacities != null + && numDataNodes > simulatedCapacities.length) { + throw new IllegalArgumentException( "The length of simulatedCapacities [" + + simulatedCapacities.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } + // Set up the right ports for the datanodes conf.setInt("dfs.datanode.info.port", 0); @@ -262,10 +269,9 @@ } if (simulatedCapacities != null) { dnConf.setBoolean("dfs.datanode.simulateddatastorage", true); + dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, + simulatedCapacities[i-curDatanodesNum]); } - if (simulatedCapacities != null && i < simulatedCapacities.length) { - dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, simulatedCapacities[i]); - } System.out.println("Starting DataNode " + i + " with dfs.data.dir: " + dnConf.get("dfs.data.dir")); dataNodes.add(DataNode.createDataNode(dnArgs, dnConf)); Index: src/java/org/apache/hadoop/dfs/DFSClient.java =================================================================== --- src/java/org/apache/hadoop/dfs/DFSClient.java (revision 599531) +++ src/java/org/apache/hadoop/dfs/DFSClient.java (working copy) @@ -72,7 +72,7 @@ private TreeMap pendingCreates = new TreeMap(); - private static ClientProtocol createNamenode( + static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( Index: src/java/org/apache/hadoop/dfs/Balancer.java =================================================================== --- src/java/org/apache/hadoop/dfs/Balancer.java (revision 0) +++ src/java/org/apache/hadoop/dfs/Balancer.java (revision 0) @@ -0,0 +1,1481 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Formatter; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.dfs.FSConstants.DatanodeReportType; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/**

The balancer is a tool that balances disk space usage on an HDFS cluster + * when some datanodes become full or when new empty nodes join the cluster. + * The tool is deployed as an application program that can be run by the + * cluster administrator. + * + *

SYNOPSIS + *

To start: + *

bin/start-balancer.sh [-threshold ] + *

Example: bin/ start-balancer.sh + *

start the balancer with a default threshold of 10% + *

bin/ start-balancer.sh -threshold 5 + *

start the balancer with a threshold of 5% + *

To stop: + *

bin/ stop-balancer.sh + * + *

DESCRIPTION + *

The threshold parameter is a fraction in the range of (0, 1) with a + * default value of 10%. The threshold sets a target for whether the cluster + * is balanced. A cluster is balanced if for each datanode, the utilization + * of the node (ratio of used space at the node to total capacity of the node) + * differs from the utilization of the (ratio of used space in the cluster + * to total capacity of the cluster) by no more than the threshold value. + * The smaller the threshold, the more balanced a cluster will become. + * It takes more time to run the balancer for small threshold values. + * Also for a very small threshold the cluster may not be able to reach the + * balanced state when applications write and delete files concurrently. + * + *

The tool moves blocks from highly utilized datanodes to poorly + * utilized datanodes iteratively. In each iteration a datanode moves or + * receives no more than the lesser of 10G bytes or the threshold fraction + * of its capacity. Each iteration runs no more than 20 minutes. + *

A system property that limits the balancer's use of bandwidth is + * defined in the default configuration file: + *

+ *

dfs.balance.bandwidthPerSec + *

1048576 + *

Specifies the maximum bandwidth that each datanode + * can utilize for the balancing purpose in term of the number of bytes + * per second. + *

+ * + *

This property determines the maximum speed at which a block will be + * moved from one datanode to another. The default value is 1MB/s. The higher + * the bandwidth, the faster a cluster can reach the balanced state, + * but with greater competition with application processes. If an + * administrator changes the value of this property in the configuration + * file, the change is observed when HDFS is next restarted. + * + *

After the balancer is started, an output file name where the balancer + * progress will be recorded is printed on the screen. The administrator + * can monitor the running of the balancer by reading the output file. + * The output shows the balancer's status iteration by iteration. In each + * iteration it prints the starting time, the iteration number, the total + * number of bytes that have been moved in the previous iterations, + * the total number of bytes that are left to move in order for the cluster + * to be balanced, and the number of bytes that are being moved in this + * iteration. Normally "Bytes Already Moved" is increasing while "Bytes Left + * To Move" is decreasing. + * + *

Running multiple instances of the balancer in an HDFS cluster is + * unexpected and prohibited by the tool. + * + *

The balancer automatically exits when any of the following five + * conditions is satisfied: + *

1. The cluster is balanced; + *

2. No block can be moved; + *

3. No block has been moved for three consecutive iterations. + *

4. An IOException occurs while communicating with the namenode; + *

5. Another balancer is running. + * + *

Upon exit, a balancer returns an exit code and prints one of the + * following messages to the output file in corresponding to the above exit + * reasons: + *

1. The cluster is balanced. Exiting + *

2. No block can be moved. Exiting... + *

3. No block has been moved for 3 iterations. Exiting... + *

4. Received an IO exception: failure reason. Exiting... + *

5. Another balancer is running. Exiting... + * + *

The administrator can interrupt the execution of the balancer at any + * time by running the command "stop-balancer.sh" on the machine where the + * balancer is running. + */ + +public class Balancer implements Tool { + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.dfs.Balancer"); + final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB + + private Configuration conf; + + private double threshold = 10D; + private NamenodeProtocol namenode; + private ClientProtocol client; + private FileSystem fs; + private final static Random rnd = new Random(); + + // all data node lists + private Collection overUtilizedDatanodes + = new LinkedList(); + private Collection aboveAvgUtilizedDatanodes + = new LinkedList(); + private Collection belowAvgUtilizedDatanodes + = new LinkedList(); + private Collection underUtilizedDatanodes + = new LinkedList(); + + private Collection sources + = new HashSet(); + private Collection targets + = new HashSet(); + + private Map globalBlockList + = new HashMap(); + private Map movedBlocks + = new HashMap(); + private Map datanodes + = new HashMap(); + + private NetworkTopology cluster = new NetworkTopology(); + + private double avgUtilization = 0.0D; + + /* This class keeps track of a scheduled block move */ + private class PendingBlockMove { + private BalancerBlock block; + private Source source; + private BalancerDatanode proxySource; + private BalancerDatanode target; + + /** constructor */ + private PendingBlockMove() { + } + + /* choose a block & a proxy source for this pendingMove + * whose souce & target have already been chosen. + * + * Return true if a block and its proxy are chosen; false otherwise + */ + private boolean chooseBlockAndProxy() { + // iterate all source's blocks until find a good one + for (Iterator blocks= + source.getBlockIterator(); blocks.hasNext();) { + if (markMovedIfGoodBlock(blocks.next())) { + blocks.remove(); + return true; + } + } + return false; + } + + /* Return true if the given block is good for the tentatvie move; + * If it is good, add it to the moved list to marked as "Moved". + * A block is good iff + * 1. it is a good candidate; see isGoodBlockCandidate + * 2. can find a proxy source that's not busy for this move + */ + private boolean markMovedIfGoodBlock(BalancerBlock block) { + synchronized(block) { + synchronized(movedBlocks) { + if (isGoodBlockCandidate(source, target, block)) { + this.block = block; + if ( chooseProxySource() ) { + addToMoved(block); + LOG.info("Decided to move block "+ block.getBlockId() + +" with a length of "+FsShell.byteDesc(block.getNumBytes()) + + " bytes from " + source.getName() + + " to " + target.getName() + + " using proxy source " + proxySource.getName() ); + return true; + } + } + } + } + return false; + } + + /* Now we find out source, target, and block, we need to find a proxy + * + * @return true if a proxy is found; otherwise false + */ + private boolean chooseProxySource() { + // check if there is replica which is on the same rack with the target + for (BalancerDatanode loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { + if (loc.addPendingBlock(this)) { + proxySource = loc; + return true; + } + } + } + // find out a non-busy replica + for (BalancerDatanode loc : block.getLocations()) { + if (loc.addPendingBlock(this)) { + proxySource = loc; + return true; + } + } + return false; + } + + /* Dispatch the block move task to the proxy source & wait for the response + */ + private void dispatch() { + Socket sock = new Socket(); + DataOutputStream out = null; + DataInputStream in = null; + try { + sock.connect(DataNode.createSocketAddr( + proxySource.datanode.getName()), FSConstants.READ_TIMEOUT); + long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024); + sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+ + (int)(block.getNumBytes()*1500/bandwidth)); + out = new DataOutputStream( new BufferedOutputStream( + sock.getOutputStream(), FSConstants.BUFFER_SIZE)); + sendRequest(out); + in = new DataInputStream( new BufferedInputStream( + sock.getInputStream(), FSConstants.BUFFER_SIZE)); + receiveResponse(in); + bytesMoved.inc(block.getNumBytes()); + if (LOG.isDebugEnabled()) { + LOG.debug( "Moving block " + block.getBlock().getBlockId() + + " from "+ source.getName() + " to " + + target.getName() + " through " + + proxySource.getName() + + " succeeded." ); + } + } catch (IOException e) { + LOG.warn("Error moving block "+block.getBlockId()+ + " from " + source.getName() + " to " + + target.getName() + " through " + + proxySource.getName() + + ": "+e.getMessage()+ "\n" + + StringUtils.stringifyException(e) ); + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeSocket(sock); + + proxySource.removePendingBlock(this); + synchronized(target) { + target.removePendingBlock(this); + } + + synchronized (this ) { + reset(); + } + synchronized (Balancer.this) { + Balancer.this.notifyAll(); + } + } + } + + /* Send a block copy request to the outputstream*/ + private void sendRequest(DataOutputStream out) throws IOException { + out.writeShort(FSConstants.DATA_TRANFER_VERSION); + out.writeByte(FSConstants.OP_COPY_BLOCK); + out.writeLong(block.getBlock().getBlockId()); + Text.writeString(out, source.getStorageID()); + target.write(out); + out.flush(); + } + + /* Receive a block copy response from the input stream */ + private void receiveResponse(DataInputStream in) throws IOException { + short status = in.readShort(); + if (status != FSConstants.OP_STATUS_SUCCESS) { + throw new IOException("Moving block "+block.getBlockId()+ + " from "+source.getName() + " to " + + target.getName() + " through " + + proxySource.getName() + + "failed"); + } + } + + /* reset the object */ + private void reset() { + block = null; + source = null; + proxySource = null; + target = null; + } + + /* start a thread to dispatch the block move */ + private void scheduleBlockMove() { + BlockMover blockMover = new BlockMover(); + blockMover.setDaemon(true); + blockMover.setName("Block mover for "+ block.getBlockId() + + " from " + proxySource.getName() + " to " + target.getName()); + LOG.info("Starting " + blockMover.getName()); + blockMover.start(); + } + + /* A thread for moving a block */ + private class BlockMover extends Thread { + BlockMover() { + } + + public void run() { + dispatch(); + } + } + } + + /* A class for keeping track of blocks in the Balancer */ + static private class BalancerBlock { + private Block block; // the block + private List locations + = new ArrayList(3); // its locations + + /* Constructor */ + private BalancerBlock(Block block) { + this.block = block; + } + + /* clean block locations */ + private synchronized void clearLocations() { + locations.clear(); + } + + /* add a location */ + private synchronized void addLocation(BalancerDatanode datanode) { + if (!locations.contains(datanode)) { + locations.add(datanode); + } + } + + /* Return if the block is located on datanode */ + private synchronized boolean isLocatedOnDatanode( + BalancerDatanode datanode) { + return locations.contains(datanode); + } + + /* Return its locations */ + private synchronized List getLocations() { + return locations; + } + + /* Return the block */ + private Block getBlock() { + return block; + } + + /* Return the block id */ + private long getBlockId() { + return block.getBlockId(); + } + + /* Return the length of the block */ + private long getNumBytes() { + return block.getNumBytes(); + } + } + + /* The class represents a desired move of bytes between two nodes + * and the target. + * An object of this class is stored in a source node. + */ + static private class NodeTask { + private BalancerDatanode datanode; //target node + private long size; //bytes scheduled to move + + /* constructor */ + private NodeTask(BalancerDatanode datanode, long size) { + this.datanode = datanode; + this.size = size; + } + + /* Get the node */ + private BalancerDatanode getDatanode() { + return datanode; + } + + /* Get the number of bytes that need to be moved */ + private long getSize() { + return size; + } + } + + /* Return the utilization of a datanode */ + static private double getUtilization(DatanodeInfo datanode) { + return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100; + } + + /* A class that keeps track of a datanode in Balancer */ + private static class BalancerDatanode implements Writable { + final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB + final protected static short MAX_NUM_CONCURRENT_MOVES = + DataNode.MAX_BALANCING_THREADS; + protected DatanodeInfo datanode; + private double utilization; + protected long maxSizeToMove; + protected long scheduledSize = 0L; + // blocks being moved but not confirmed yet + private List pendingBlocks = + new ArrayList(MAX_NUM_CONCURRENT_MOVES); + + /* Constructor + * Depending on avgutil & threshold, calculate maximum bytes to move + */ + private BalancerDatanode( + DatanodeInfo node, double avgUtil, double threshold) { + datanode = node; + utilization = Balancer.getUtilization(node); + + if (utilization >= avgUtil+threshold + || utilization <= avgUtil-threshold) { + maxSizeToMove = (long)(threshold*datanode.getCapacity()/100); + } else { + maxSizeToMove = + (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100); + } + if (utilization < avgUtil ) { + maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove); + } + maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + } + + /** Get the datanode */ + protected DatanodeInfo getDatanode() { + return datanode; + } + + /** Get the name of the datanode */ + protected String getName() { + return datanode.getName(); + } + + /* Get the storage id of the datanode */ + protected String getStorageID() { + return datanode.getStorageID(); + } + + /** Decide if still need to move more bytes */ + protected boolean isMoveQuotaFull() { + return scheduledSize nodeTasks = new ArrayList(2); + private long blocksToReceive = 0L; + /* source blocks point to balancerBlocks in the global list because + * we want to keep one copy of a block in balancer and be aware that + * the locations are changing over time. + */ + private List srcBlockList + = new ArrayList(); + + /* constructor */ + private Source(DatanodeInfo node, double avgUtil, double threshold) { + super(node, avgUtil, threshold); + } + + /** Add a node task */ + private void addNodeTask(NodeTask task) { + assert (task.datanode != this) : + "Source and target are the same " + datanode.getName(); + incScheduledSize(task.getSize()); + nodeTasks.add(task); + } + + /* Return an iterator to this source's blocks */ + private Iterator getBlockIterator() { + return srcBlockList.iterator(); + } + + /* fetch new blocks of this source from namenode and + * update this source's block list & the global block list + * Return the tota size of the received blocks in the number of bytes. + */ + private long getBlockList() throws IOException { + BlockWithLocations[] newBlocks = namenode.getBlocks(datanode, + (long)Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks(); + long bytesReceived = 0; + for (BlockWithLocations blk : newBlocks) { + bytesReceived += blk.getBlock().getNumBytes(); + BalancerBlock block; + synchronized(globalBlockList) { + block = globalBlockList.get(blk.getBlock()); + if (block==null) { + block = new BalancerBlock(blk.getBlock()); + globalBlockList.put(blk.getBlock(), block); + } else { + block.clearLocations(); + } + + synchronized (block) { + // update locations + for ( String location : blk.getDatanodes() ) { + BalancerDatanode datanode = datanodes.get(location); + if (datanode != null) { // not an unknown datanode + block.addLocation(datanode); + } + } + } + if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) { + // filter bad candidates + srcBlockList.add(block); + } + } + } + return bytesReceived; + } + + /* Decide if the given block is a good candiate to move or not */ + private boolean isGoodBlockCandidate(BalancerBlock block) { + for (NodeTask nodeTask : nodeTasks) { + if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) { + return true; + } + } + return false; + } + + /* Return a block that's good for the source thread to dispatch immediately + * The block's source, target, and proxy source are determined too. + * When choosing proxy and target, source & target throttling + * has been considered. They are choosen only when they have the capacity + * to support this block move. + * The block should be disptched immediately after this method is returned. + */ + private PendingBlockMove chooseNextBlockToMove() { + for ( Iterator tasks=nodeTasks.iterator(); tasks.hasNext(); ) { + NodeTask task = tasks.next(); + BalancerDatanode target = task.getDatanode(); + PendingBlockMove pendingBlock = new PendingBlockMove(); + if ( target.addPendingBlock(pendingBlock) ) { + // target is not busy, so do a tentative block allocation + pendingBlock.source = this; + pendingBlock.target = target; + if ( pendingBlock.chooseBlockAndProxy() ) { + long blockSize = pendingBlock.block.getNumBytes(); + scheduledSize -= blockSize; + task.size -= blockSize; + if (task.size == 0) { + tasks.remove(); + } + return pendingBlock; + } else { + // cancel the tentative move + target.removePendingBlock(pendingBlock); + } + } + } + return null; + } + + /* iterate all source's blocks to remove moved ones */ + private void filterMovedBlocks() { + for (Iterator blocks=getBlockIterator(); + blocks.hasNext();) { + if (isMoved(blocks.next())) { + blocks.remove(); + } + } + } + + private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5; + /* Return if should fetch more blocks from namenode */ + private boolean shouldFetchMoreBlocks() { + return srcBlockList.size()0; + } + + /* This method iteratively does the following: + * it first selects a block to move, + * then sends a request to the proxy source to start the block move + * when the source's block list falls below a threshold, it asks + * the namenode for more blocks. + * It terminates when it has dispatch enough block move tasks or + * it has received enough blocks from the namenode, or + * the elapsed time of the iteration has exceeded the max time limit. + */ + private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins + private void dispatchBlocks() { + long startTime = FSNamesystem.now(); + this.blocksToReceive = 2*scheduledSize; + boolean isTimeUp = false; + while(!isTimeUp && scheduledSize>0 && + (!srcBlockList.isEmpty() || blocksToReceive>0)) { + PendingBlockMove pendingBlock = chooseNextBlockToMove(); + if (pendingBlock != null) { + // move the block + pendingBlock.scheduleBlockMove(); + continue; + } + + /* Since we can not schedule any block to move, + * fileter any moved blocks from the source block list and + * check if we should fetch more blocks from the namenode + */ + filterMovedBlocks(); // filter already moved blocks + if (shouldFetchMoreBlocks()) { + // fetch new blocks + try { + blocksToReceive -= getBlockList(); + continue; + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + return; + } + } + + // check if time is up or not + if (FSNamesystem.now()-startTime > MAX_ITERATION_TIME) { + isTimeUp = true; + continue; + } + + /* Now we can not schedule any block to move and there are + * no new blocks added to the source block list, so we wait. + */ + try { + synchronized(Balancer.this) { + Balancer.this.wait(1000); // wait for targets/sources to be idle + } + } catch (InterruptedException ignored) { + } + } + } + } + + /** Default constructor */ + Balancer() { + } + + /** Construct a balancer from the given configuration */ + Balancer(Configuration conf) { + setConf(conf); + } + + /** Construct a balancer from the given configuration and threshold */ + Balancer(Configuration conf, double threshold) { + setConf(conf); + this.threshold = threshold; + } + + /** + * Run a balancer + * @param args + */ + public static void main(String[] args) { + try { + System.exit( ToolRunner.run(null, new Balancer(), args) ); + } catch (Throwable e) { + LOG.error(StringUtils.stringifyException(e)); + System.exit(-1); + } + + } + + private static void printUsage() { + System.out.println("Usage: java Balancer"); + System.out.println(" [-threshold ]\t" + +"percentage of disk capacity"); + } + + /* parse argument to get the threshold */ + private double parseArgs(String[] args) { + double threshold=0; + int argsLen = (args == null) ? 0 : args.length; + if (argsLen==0) { + threshold = 10; + } else { + if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) { + printUsage(); + System.exit(-1); + } else { + try { + threshold = Double.parseDouble(args[1]); + if (threshold < 0 || threshold >100) { + throw new NumberFormatException(); + } + LOG.info( "Using a threshold of " + threshold ); + } catch(NumberFormatException e) { + System.err.println( + "Expect a double parameter in the range of [0, 100]: "+ args[1]); + printUsage(); + System.exit(ILLEGAL_ARGS); + } + } + } + return threshold; + } + + /* Initialize balancer. It sets the value of the threshold, and + * builds the communication proxies to + * namenode as a client and a secondary namenode and retry proxies + * when connection fails. + */ + private void init(double threshold) throws IOException { + this.threshold = threshold; + // get name node address + InetSocketAddress nameNodeAddr = DataNode.createSocketAddr( + conf.get("fs.default.name", "local")); + // connect to name node + this.namenode = createNamenode(nameNodeAddr, conf); + this.client = DFSClient.createNamenode(nameNodeAddr, conf); + this.fs = FileSystem.get(conf); + } + + /* Build a NamenodeProtocol connection to the namenode and + * set up the retry policy */ + private static NamenodeProtocol createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { + RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( + 5, 200, TimeUnit.MILLISECONDS); + Map,RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + RetryPolicy methodPolicy = RetryPolicies.retryByException( + timeoutPolicy, exceptionToPolicyMap); + Map methodNameToPolicyMap = + new HashMap(); + methodNameToPolicyMap.put("getBlocks", methodPolicy); + + return (NamenodeProtocol) RetryProxy.create( + NamenodeProtocol.class, + RPC.getProxy(NamenodeProtocol.class, + NamenodeProtocol.versionID, + nameNodeAddr, + conf), + methodNameToPolicyMap); + } + + /* Shuffle datanode array */ + static private void shuffleArray(DatanodeInfo[] datanodes) { + for (int i=datanodes.length; i>1; i--) { + int randomIndex = rnd.nextInt(i); + DatanodeInfo tmp = datanodes[randomIndex]; + datanodes[randomIndex] = datanodes[i-1]; + datanodes[i-1] = tmp; + } + } + + /* get all live datanodes of a cluster and their disk usage + * decide the number of bytes need to be moved + */ + private long initNodes() throws IOException { + return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE)); + } + + /* Given a data node set, build a network topology and decide + * overutilized datanodes, above average utilized datanodes, + * below average utilized datanodes, and underutilized datanodes. + * The input data node set is shuffled before the datanodes + * are put into the overutilized datanodes, above average utilized + * datanodes, below average utilized datanodes, and + * underutilized datanodes lists. This will add some randomness + * to the node matching later on. + * + * @return the total number of bytes that are + * needed to move to make the cluster balanced. + * @param datanodes a set of datanodes + */ + private long initNodes(DatanodeInfo[] datanodes) { + // compute average utilization + long totalCapacity=0L, totalUsedSpace=0L; + for (DatanodeInfo datanode : datanodes) { + totalCapacity += datanode.getCapacity(); + totalUsedSpace += datanode.getDfsUsed(); + } + this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100; + + /*create network topology and all data node lists: + * overloaded, above-average, below-average, and underloaded + * we alternates the accessing of the given datanodes arrray either by + * an increasing order or a decreasing order. + */ + long overLoadedBytes = 0L, underLoadedBytes = 0L; + shuffleArray(datanodes); + for (DatanodeInfo datanode : datanodes) { + cluster.add(datanode); + BalancerDatanode datanodeS; + if (getUtilization(datanode) > avgUtilization) { + datanodeS = new Source(datanode, avgUtilization, threshold); + if (isAboveAvgUtilized(datanodeS)) { + this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); + } else { + assert(isOverUtilized(datanodeS)) : + datanodeS.getName()+ "is not an overUtilized node"; + this.overUtilizedDatanodes.add((Source)datanodeS); + overLoadedBytes += (long)((datanodeS.utilization-avgUtilization + -threshold)*datanodeS.datanode.getCapacity()/100.0); + } + } else { + datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold); + if ( isBelowAvgUtilized(datanodeS)) { + this.belowAvgUtilizedDatanodes.add(datanodeS); + } else { + assert (isUnderUtilized(datanodeS)) : + datanodeS.getName()+ "is not an underUtilized node"; + this.underUtilizedDatanodes.add(datanodeS); + underLoadedBytes += (long)((avgUtilization-threshold- + datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); + } + } + this.datanodes.put(datanode.getStorageID(), datanodeS); + } + + //logging + logImbalancedNodes(); + + assert (this.datanodes.size() == + overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ + aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()) + : "Mismatched number of datanodes"; + + // return number of bytes to be moved in order to make the cluster balanced + return Math.max(overLoadedBytes, underLoadedBytes); + } + + /* log the over utilized & under utilized nodes */ + private void logImbalancedNodes() { + StringBuilder msg = new StringBuilder(); + msg.append(overUtilizedDatanodes.size()); + msg.append(" over utilized nodes:"); + for (Source node : overUtilizedDatanodes) { + msg.append( " " ); + msg.append( node.getName() ); + } + LOG.info(msg); + msg = new StringBuilder(); + msg.append(underUtilizedDatanodes.size()); + msg.append(" under utilized nodes: "); + for (BalancerDatanode node : underUtilizedDatanodes) { + msg.append( " " ); + msg.append( node.getName() ); + } + LOG.info(msg); + } + + /* Decide all pairs and + * the number of bytes to move from a source to a target + * Maximum bytes to be moved per node is + * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). + * Return total number of bytes to move in this iteration + */ + private long chooseNodes() { + // Match nodes on the same rack first + chooseNodes(true); + // Then match nodes on differrent racks + chooseNodes(false); + + assert (datanodes.size() == + overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ + aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+ + sources.size()+targets.size()) + : "Mismatched number of datanodes"; + + long bytesToMove = 0L; + for (Source src : sources) { + bytesToMove += src.scheduledSize; + } + return bytesToMove; + } + + /* if onRack is true, decide all pairs + * where source and target are on the same rack; Otherwise + * decide all pairs where source and target are + * on different racks + */ + private void chooseNodes(boolean onRack) { + /* first step: match each overUtilized datanode (source) to + * one or more underUtilized datanodes (targets). + */ + chooseTargets(underUtilizedDatanodes.iterator(), onRack); + + /* match each remaining overutilized datanode (source) to + * below average utlized datanodes (targets). + * Note only overutilized datanodes that haven't had that max bytes to move + * satisfied in step 1 are selected + */ + chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack); + + /* match each remaining underutilized datanode to + * above average utlized datanodes. + * Note only underutilized datanodes that have not had that max bytes to + * move satisfied in step 1 are selected. + */ + chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack); + } + + /* choose targets from the target candidate list for each over utilized + * source datanode. OnRackTarget determines if the chosen target + * should be on the same rack as the source + */ + private void chooseTargets( + Iterator targetCandidates, boolean onRackTarget ) { + for (Iterator srcIterator = overUtilizedDatanodes.iterator(); + srcIterator.hasNext();) { + Source source = srcIterator.next(); + while( chooseTarget(source, targetCandidates, onRackTarget) ) { + } + if (!source.isMoveQuotaFull()) { + srcIterator.remove(); + } + } + return; + } + + /* choose sources from the source candidate list for each under utilized + * target datanode. onRackSource determines if the chosen source + * should be on the same rack as the target + */ + private void chooseSources( + Iterator sourceCandidates, boolean onRackSource) { + for (Iterator targetIterator = + underUtilizedDatanodes.iterator(); targetIterator.hasNext();) { + BalancerDatanode target = targetIterator.next(); + while( chooseSource(target, sourceCandidates, onRackSource) ) { + } + if (!target.isMoveQuotaFull()) { + targetIterator.remove(); + } + } + return; + } + + /* For the given source, choose targets from the target candidate list. + * OnRackTarget determines if the chosen target + * should be on the same rack as the source + */ + private boolean chooseTarget(Source source, + Iterator targetCandidates, boolean onRackTarget) { + if (!source.isMoveQuotaFull()) { + return false; + } + boolean foundTarget = false; + BalancerDatanode target = null; + while(!foundTarget && targetCandidates.hasNext()) { + target = targetCandidates.next(); + if ( !target.isMoveQuotaFull()) { + targetCandidates.remove(); + continue; + } + if ( onRackTarget ) { + // choose from on-rack nodes + if ( cluster.isOnSameRack(source.datanode, target.datanode)) { + foundTarget = true; + } + } else { + // choose from off-rack nodes + if ( !cluster.isOnSameRack(source.datanode, target.datanode)) { + foundTarget = true; + } + } + } + if (foundTarget) { + assert(target != null):"Choose a null target"; + long size = Math.min(source.availableSizeToMove(), + target.availableSizeToMove()); + NodeTask nodeTask = new NodeTask(target, size); + source.addNodeTask(nodeTask); + target.incScheduledSize(nodeTask.getSize()); + sources.add(source); + targets.add(target); + if ( !target.isMoveQuotaFull()) { + targetCandidates.remove(); + } + LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from " + +source.datanode.getName() + " to " + target.datanode.getName()); + return true; + } + return false; + } + + /* For the given target, choose sources from the source candidate list. + * OnRackSource determines if the chosen source + * should be on the same rack as the target + */ + private boolean chooseSource(BalancerDatanode target, + Iterator sourceCandidates, boolean onRackSource) { + if (!target.isMoveQuotaFull()) { + return false; + } + boolean foundSource = false; + Source source = null; + while(!foundSource && sourceCandidates.hasNext()) { + source = sourceCandidates.next(); + if ( !source.isMoveQuotaFull()) { + sourceCandidates.remove(); + continue; + } + if (onRackSource) { + // choose from on-rack nodes + if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { + foundSource = true; + } + } else { + // choose from off-rack nodes + if ( !cluster.isOnSameRack(source.datanode, target.datanode)) { + foundSource = true; + } + } + } + if (foundSource) { + assert(target != null):"Choose a null target"; + long size = Math.min(source.availableSizeToMove(), + target.availableSizeToMove()); + NodeTask nodeTask = new NodeTask(target, size); + source.addNodeTask(nodeTask); + target.incScheduledSize(nodeTask.getSize()); + sources.add(source); + targets.add(target); + if ( !source.isMoveQuotaFull()) { + sourceCandidates.remove(); + } + LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from " + +source.datanode.getName() + " to " + target.datanode.getName()); + return true; + } + return false; + } + + private static class BytesMoved { + private long bytesMoved = 0L;; + private synchronized void inc( long bytes ) { + bytesMoved += bytes; + } + + private long get() { + return bytesMoved; + } + }; + private BytesMoved bytesMoved = new BytesMoved(); + private int notChangedIterations = 0; + + /* Start a thread to dispatch block moves for each source. + * The thread selects blocks to move & sends request to proxy source to + * initiate block move. The process is flow controled. Block selection is + * blocked if there are too many un-confirmed block moves. + * Return the total number of bytes successfully moved in this iteration. + */ + private long dispatchBlockMoves() { + long bytesLastMoved = bytesMoved.get(); + Source.BlockMoveDispatcher dispatchers[] = + new Source.BlockMoveDispatcher[sources.size()]; + int i=0; + for (Source source : sources) { + dispatchers[i] = source.new BlockMoveDispatcher(); + dispatchers[i].setName("Dispatcher for source " + source.getName()); + LOG.info("Starting " + dispatchers[i].getName()); + dispatchers[i++].start(); + } + for (Source.BlockMoveDispatcher dispatcher : dispatchers) { + try { + dispatcher.join(); + } catch (InterruptedException e) { + LOG.info(StringUtils.stringifyException(e)); + } + } + waitForMoveCompletion(); + return bytesMoved.get()-bytesLastMoved; + } + + /* waite for all block move confirmations + * by checking each target's pendingMove queue + */ + private void waitForMoveCompletion() { + boolean shouldWait; + do { + shouldWait = false; + for (BalancerDatanode target : targets) { + if (!target.isPendingQEmpty()) { + shouldWait = true; + } + } + if (shouldWait) { + try { + Thread.sleep(60000); + } catch (InterruptedException ignored) { + } + } + } while (shouldWait); + } + + /* mark a block to be moved */ + private void addToMoved(BalancerBlock block) { + synchronized(movedBlocks) { + movedBlocks.put(block.getBlock(), block); + } + } + + /* check if a block is marked as moved */ + private boolean isMoved(BalancerBlock block) { + synchronized(movedBlocks) { + return movedBlocks.containsKey(block.getBlock()); + } + } + + /* Decide if it is ok to move the given block from source to target + * A block is a good candiate if + * 1. the block is not in the process of being moved/has not been moved; + * 2. the block does not have a replica on the target; + * 3. doing the move does not reduce the number of racks that the block has + */ + private boolean isGoodBlockCandidate(Source source, + BalancerDatanode target, BalancerBlock block) { + // check if the block is moved or not + if (isMoved(block)) { + return false; + } + if (block.isLocatedOnDatanode(target)) { + return false; + } + + boolean goodBlock = false; + if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { + // good if source and target are on the same rack + goodBlock = true; + } else { + boolean notOnSameRack = true; + synchronized (block) { + for (BalancerDatanode loc : block.locations) { + if (cluster.isOnSameRack(loc.datanode, target.datanode)) { + notOnSameRack = false; + break; + } + } + } + if (notOnSameRack) { + // good if target is target is not on the same rack as any replica + goodBlock = true; + } else { + // good if source is on the same rack as on of the replicas + for (BalancerDatanode loc : block.locations) { + if (loc != source && + cluster.isOnSameRack(loc.datanode, source.datanode)) { + goodBlock = true; + break; + } + } + } + } + return goodBlock; + } + + /* reset all fields in a balancer preparing for the next iteration */ + private void resetData() { + this.cluster = new NetworkTopology(); + this.overUtilizedDatanodes.clear(); + this.aboveAvgUtilizedDatanodes.clear(); + this.belowAvgUtilizedDatanodes.clear(); + this.underUtilizedDatanodes.clear(); + this.datanodes.clear(); + this.sources.clear(); + this.targets.clear(); + this.avgUtilization = 0.0D; + cleanGlobalBlockList(); + } + + /* Remove all blocks from the gloabal block list except for the ones in the + * moved list. + */ + private void cleanGlobalBlockList() { + for (Iterator globalBlockListIterator=globalBlockList.keySet().iterator(); + globalBlockListIterator.hasNext();) { + Block block = globalBlockListIterator.next(); + if(!movedBlocks.containsKey(block)) { + globalBlockListIterator.remove(); + } + } + } + + /* Return true if the given datanode is overUtilized */ + private boolean isOverUtilized(BalancerDatanode datanode) { + return datanode.utilization > (avgUtilization+threshold); + } + + /* Return true if the given datanode is above average utilized + * but not overUtilized */ + private boolean isAboveAvgUtilized(BalancerDatanode datanode) { + return (datanode.utilization <= (avgUtilization+threshold)) + && (datanode.utilization > avgUtilization); + } + + /* Return true if the given datanode is underUtilized */ + private boolean isUnderUtilized(BalancerDatanode datanode) { + return datanode.utilization < (avgUtilization-threshold); + } + + /* Return true if the given datanode is below average utilized + * but not underUtilized */ + private boolean isBelowAvgUtilized(BalancerDatanode datanode) { + return (datanode.utilization >= (avgUtilization-threshold)) + && (datanode.utilization < avgUtilization); + } + + // Exit status + public static int SUCCESS = 1; + public static int ALREADY_RUNNING = -1; + public static int NO_MOVE_BLOCK = -2; + public static int NO_MOVE_PROGRESS = -3; + public static int IO_EXCEPTION = -4; + public static int ILLEGAL_ARGS = -5; + /** main method of Balancer + * @param args arguments to a Balancer + * @exception any exception occurs during datanode balancing + */ + public int run(String[] args) throws Exception { + long startTime = FSNamesystem.now(); + OutputStream out = null; + try { + // initialize a blancer + init(parseArgs(args)); + + /* Check if there is another balancer running. + * Exit if there is another one running. + */ + out = checkAndMarkRunningBalancer(); + if (out == null) { + System.out.println("Another balancer is running. Exiting..."); + return ALREADY_RUNNING; + } + + Formatter formatter = new Formatter(System.out); + System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); + int iterations = 0; + while (true ) { + /* get all live datanodes of a cluster and their disk usage + * decide the number of bytes need to be moved + */ + long bytesLeftToMove = initNodes(); + if (bytesLeftToMove == 0) { + System.out.println("The cluster is balanced. Exiting..."); + return SUCCESS; + } else { + LOG.info( "Need to move "+ FsShell.byteDesc(bytesLeftToMove) + +" bytes to make the cluster balanced." ); + } + + /* Decide all the nodes that will participate in the block move and + * the number of bytes that need to be moved from one node to another + * in this iteration. Maximum bytes to be moved per node is + * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). + */ + long bytesToMove = chooseNodes(); + if (bytesToMove == 0) { + System.out.println("No block can be moved. Exiting..."); + return NO_MOVE_BLOCK; + } else { + LOG.info( "Will move " + FsShell.byteDesc(bytesToMove) + + "bytes in this iteration"); + } + + formatter.format("%-24s %10d %19s %18s %17s\n", + DateFormat.getDateTimeInstance().format(new Date()), + iterations, + FsShell.byteDesc(bytesMoved.get()), + FsShell.byteDesc(bytesLeftToMove), + FsShell.byteDesc(bytesToMove) + ); + + /* For each pair of , start a thread that repeatedly + * decide a block to be moved and its proxy source, + * then initiates the move util all bytes are moved or no more block + * available to move. + * Exit no byte has been moved for 5 consectutive iterations. + */ + if (dispatchBlockMoves() > 0) { + notChangedIterations = 0; + } else { + notChangedIterations++; + if (notChangedIterations >= 5) { + System.out.println( + "No block has been moved for 5 iterations. Exiting..."); + return NO_MOVE_PROGRESS; + } + } + + // clean all lists + resetData(); + + try { + Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3)); + } catch (InterruptedException ignored) { + } + + iterations++; + } + } catch (IOException e) { + System.out.println("Received an IO exception: " + e.getMessage() + + " . Exiting..."); + return IO_EXCEPTION; + } finally { + try { + out.close(); + fs.delete(BALANCER_ID_PATH); + } catch(IOException ignored) { + } + System.out.println("Balancing took " + + time2Str(FSNamesystem.now()-startTime)); + } + } + + private Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + /* The idea for making sure that there is no more than one balancer + * running in an HDFS is to create a file in the HDFS, writes the ip address + * of the machine on which the balancer is running to the file, but did not + * close the file until the balancer exits. + * This prevents the second balancer from running because it can not + * creates the file while the first one is running. + * + * This method checks if there is any running balancer and + * if no, mark yes if no. + * Note that this is an atomic operation. + * + * Return null if there is a running balancer; otherwise the output stream + * to the newly created file. + */ + private OutputStream checkAndMarkRunningBalancer() throws IOException { + try { + DataOutputStream out = fs.create(BALANCER_ID_PATH); + out. writeBytes(InetAddress.getLocalHost().getHostName()); + out.flush(); + return out; + } catch(RemoteException e) { + if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ + return null; + } else { + throw e; + } + } + } + + /* Given elaspedTime in ms, return a printable string */ + private static String time2Str(long elapsedTime) { + String unit; + double time = elapsedTime; + if (elapsedTime < 1000 ) { + unit = "milliseconds"; + } else if (elapsedTime < 60*1000) { + unit = "seconds"; + time = time/1000; + } else if (elapsedTime < 3600*1000) { + unit = "minutes"; + time = time/(60*1000); + } else { + unit = "hours"; + time = time/(3600*1000); + } + + return time+" "+unit; + } + + /** return this balancer's configuration */ + public Configuration getConf() { + return conf; + } + + /** set this balancer's configuration */ + public void setConf(Configuration conf) { + this.conf = conf; + } + +} Index: src/java/org/apache/hadoop/dfs/DataNode.java =================================================================== --- src/java/org/apache/hadoop/dfs/DataNode.java (revision 599531) +++ src/java/org/apache/hadoop/dfs/DataNode.java (working copy) @@ -133,10 +133,10 @@ int defaultBytesPerChecksum = 512; // The following three fields are to support balancing - final private static long BALANCE_BANDWIDTH = 1024L*1024; // 1MB/s - final private static short MAX_BALANCING_THREADS = 5; + final static short MAX_BALANCING_THREADS = 5; private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS); - private Throttler balancingThrottler = new Throttler(BALANCE_BANDWIDTH); + long balanceBandwidth; + private Throttler balancingThrottler; private static class DataNodeMetrics implements Updater { private final MetricsRecord metricsRecord; @@ -312,6 +312,11 @@ this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L; DataNode.nameNodeAddr = nameNodeAddr; + //set up parameter for cluster balancing + this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024); + LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s"); + this.balancingThrottler = new Throttler(balanceBandwidth); + //create a servlet to serve full-file content int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075); String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0"); @@ -576,8 +581,10 @@ synchronized(receivedBlockList) { synchronized(delHints) { int numBlocks = receivedBlockList.size(); - if (receivedBlockList.size() > 0) { - assert(numBlocks==delHints.size()); + if (numBlocks > 0) { + if(numBlocks!=delHints.size()) { + LOG.warn("Panic: receiveBlockList and delHints are not of the same length" ); + } // // Send newly-received blockids to namenode // @@ -587,6 +594,9 @@ } } if (blockArray != null) { + if(delHintArray == null || delHintArray.length != blockArray.length ) { + LOG.warn("Panic: block array & delHintArray are not the same" ); + } namenode.blockReceived(dnRegistration, blockArray, delHintArray); synchronized (receivedBlockList) { synchronized (delHints) { @@ -775,6 +785,9 @@ * client? For now we don't. */ private void notifyNamenodeReceivedBlock(Block block, String delHint) { + if(block==null || delHint==null) { + throw new IllegalArgumentException(block==null?"Block is null":"delHint is null"); + } synchronized (receivedBlockList) { synchronized (delHints) { receivedBlockList.add(block); @@ -1171,7 +1184,7 @@ // notify name node notifyNamenodeReceivedBlock(block, sourceID); - LOG.info("Received block " + block + + LOG.info("Moved block " + block + " from " + s.getRemoteSocketAddress()); } catch (IOException ioe) { opStatus = OP_STATUS_ERROR; @@ -1203,7 +1216,7 @@ /** Constructor */ Throttler(long bandwidthPerSec) { - this(1000, bandwidthPerSec); // by default throttling period is 1s + this(500, bandwidthPerSec); // by default throttling period is 500ms } /** Constructor */ Index: bin/stop-balancer.sh =================================================================== --- bin/stop-balancer.sh (revision 0) +++ bin/stop-balancer.sh (revision 0) @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +# Stop balancer daemon. +# Run this on the machine where the balancer is running + +hadoop-daemon.sh stop balancer Property changes on: bin/stop-balancer.sh ___________________________________________________________________ Name: svn:executable + * Index: bin/hadoop =================================================================== --- bin/hadoop (revision 599531) +++ bin/hadoop (working copy) @@ -40,6 +40,7 @@ echo " dfsadmin run a DFS admin client" echo " fsck run a DFS filesystem checking utility" echo " fs run a generic filesystem user client" + echo " balancer run a cluster balancing utility" echo " jobtracker run the MapReduce job Tracker node" echo " pipes run a Pipes job" echo " tasktracker run a MapReduce task Tracker node" @@ -180,6 +181,8 @@ CLASS=org.apache.hadoop.dfs.DFSAdmin elif [ "$COMMAND" = "fsck" ] ; then CLASS=org.apache.hadoop.dfs.DFSck +elif [ "$COMMAND" = "balancer" ] ; then + CLASS=org.apache.hadoop.dfs.Balancer elif [ "$COMMAND" = "jobtracker" ] ; then CLASS=org.apache.hadoop.mapred.JobTracker elif [ "$COMMAND" = "tasktracker" ] ; then Index: bin/start-balancer.sh =================================================================== --- bin/start-balancer.sh (revision 0) +++ bin/start-balancer.sh (revision 0) @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +# Start balancer daemon. + +hadoop-daemon.sh start balancer $@ Property changes on: bin/start-balancer.sh ___________________________________________________________________ Name: svn:executable + *