Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml (revision 593614)
+++ conf/hadoop-default.xml (working copy)
@@ -399,6 +399,16 @@
+ dfs.balance.bandwidthPerSec
+ 1048576
+
+ Specifies the maximum amount of bandwidth that each datanode
+ can utilize for the balancing purpose in term of
+ the number of bytes per second.
+
+
+
+dfs.hostsNames a file that contains a list of hosts that are
Index: src/test/org/apache/hadoop/dfs/TestBalancer.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestBalancer.java (revision 0)
+++ src/test/org/apache/hadoop/dfs/TestBalancer.java (revision 0)
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancer extends TestCase {
+ private static final Configuration CONF = new Configuration();
+ final private static long CAPACITY = 500L;
+ final private static String RACK0 = "/rack0";
+ final private static String RACK1 = "/rack1";
+ final private static String RACK2 = "/rack2";
+ final static private String fileName = "/tmp.txt";
+ final static private Path filePath = new Path(fileName);
+ private MiniDFSCluster cluster;
+
+ ClientProtocol client;
+
+ static final int DEFAULT_BLOCK_SIZE = 10;
+ private Balancer balancer;
+ private Random r = new Random();
+
+ static {
+ CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+ //CONF.setLong("dfs.blockreport.intantervalMsec", 100L);
+ CONF.setLong("dfs.heartbeat.interval", 1L);
+ CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+
+ }
+
+ /* create a file with a length of fileLen */
+ private void createFile(long fileLen, short replicationFactor)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, fileLen,
+ replicationFactor, r.nextLong());
+ DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+ }
+
+
+ /* fill up a cluster with numNodes datanodes
+ * whose used space to be size
+ */
+ private Block[] generateBlocks(long size, short numNodes) throws IOException {
+ cluster = new MiniDFSCluster( CONF, numNodes, true, null);
+ try {
+ cluster.waitActive();
+ client = DFSClient.createNamenode(
+ DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+ short replicationFactor = (short)(numNodes-1);
+ long fileLen = size/replicationFactor;
+ createFile(fileLen, replicationFactor);
+
+ List locatedBlocks = cluster.getNameNode().
+ getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
+
+ int numOfBlocks = locatedBlocks.size();
+ Block[] blocks = new Block[numOfBlocks];
+ for(int i=0; i[] blockReports = (List[])new List>[usedSpace.length];
+ Block[][] results = new Block[usedSpace.length][];
+ for(int i=0; i();
+ }
+ for(int i=0; i0 ) {
+ notChosen = false;
+ blockReports[chosenIndex].add(blocks[i]);
+ usedSpace[chosenIndex] -= blocks[i].getNumBytes();
+ }
+ }
+ }
+ }
+ for(int i=0; i10) {
+ balanced = false;
+ try {
+ Thread.sleep(100);
+ } catch(InterruptedException ignored) {
+ }
+ break;
+ }
+ }
+ } while(!balanced);
+
+ }
+ /** Test a cluster with even distribution,
+ * then a new empty node is added to the cluster*/
+ public void testBalancer0() throws Exception {
+ /** one-node cluste test*/
+ // add an empty node with the same CAPACITY & rack
+ //test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY, RACK0);
+ // add an empty node with the same CAPACITY but different rack
+ //test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY, RACK1);
+ // add an empty node with half of the CAPACITY & the same rack
+ //test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+
+ /** two-node cluster test */
+ test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK0},
+ CAPACITY, RACK0);
+ test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2);
+ }
+
+ /** Test unEven distributed cluster */
+ public void testBalancer1() throws Exception {
+ testUnevenDistribution(
+ new long[] {50*CAPACITY/100, 10*CAPACITY/100},
+ new long[]{CAPACITY, CAPACITY},
+ new String[] {RACK0, RACK1});
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ (new TestBalancer()).testBalancer0();
+
+ }
+
+}
Index: src/java/org/apache/hadoop/ipc/Server.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Server.java (revision 593614)
+++ src/java/org/apache/hadoop/ipc/Server.java (working copy)
@@ -595,9 +595,10 @@
try {
value = call(call.param); // make the call
} catch (Throwable e) {
- LOG.info(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
+ LOG.info(getName()+", call "+call+": error: !!!" + error);
+
}
CurCall.set(null);
Index: src/java/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DFSClient.java (revision 593614)
+++ src/java/org/apache/hadoop/dfs/DFSClient.java (working copy)
@@ -104,7 +104,7 @@
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
- private static ClientProtocol createNamenode(
+ static ClientProtocol createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf)
throws IOException {
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
@@ -1572,7 +1572,10 @@
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum || (len + bytesWrittenToBlock) > blockSize) {
// should never happen
- throw new IOException("Mismatch in writeChunk() args");
+ throw new IOException("Mismatch in writeChunk() args: len="+len+
+ ", bytesPerChecksum="+bytesPerChecksum+
+ ", byteswrittenToBlock="+bytesWrittenToBlock+
+ ", blockSize="+blockSize);
}
if ( backupFile == null ) {
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java (revision 593614)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java (working copy)
@@ -567,6 +567,7 @@
*/
synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
throws IOException {
+ LOG.debug("Get blocks of "+datanode.getName() );
DatanodeDescriptor node = getDatanode(datanode);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
@@ -598,6 +599,8 @@
}
}
+ LOG.debug("End getting blocks of " + datanode.getName() );
+
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
}
@@ -2415,13 +2418,15 @@
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
+ boolean firstOne = true;
while (nonExcess.size() - replication > 0) {
+ LOG.info("nonExcess.size: "+nonExcess.size()+ " replication: "+replication);
DatanodeInfo cur = null;
long minSpace = Long.MAX_VALUE;
// check if we can del delNodeHint
- if( delNodeHint !=null && (priSet.contains(delNodeHint) ||
- (addedNode != null && !priSet.contains(addedNode))) ) {
+ if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
+ (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
Iterator iter =
@@ -2437,6 +2442,7 @@
}
}
+ firstOne = false;
// adjust rackmap, priSet, and remains
String rack = cur.getNetworkLocation();
ArrayList datanodes = rackMap.get(rack);
@@ -2531,6 +2537,7 @@
Block block,
String delHint
) throws IOException {
+ LOG.debug("Received " + block + " with delHint " + delHint);
DatanodeDescriptor node = getDatanode(nodeID);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
@@ -2569,6 +2576,8 @@
//
addStoredBlock(block, node, delHintNode );
pendingReplications.remove(block);
+
+ LOG.debug("End block received" );
}
/**
Index: src/java/org/apache/hadoop/dfs/Balancer.java
===================================================================
--- src/java/org/apache/hadoop/dfs/Balancer.java (revision 0)
+++ src/java/org/apache/hadoop/dfs/Balancer.java (revision 0)
@@ -0,0 +1,1343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Formatter;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/** A tool to balance a dfs cluster.
+ * To run:
+ * bin/hadoop-daemon.sh start balance [-threshold ]
+ * e.g. bin/hadoop-daemon.sh start balance
+ * bin/hadoop-daemon.sh start balance -threshold 5
+ *
+ * To stop:
+ * bin/hadoop-daemon.sh stop balance
+ *
+ *
The tool exits when the utilization of each datanode in the cluster
+ * falls into (avg_utilization-threshold, avg_utilization+threshold) or
+ * no progess has been made for three iterations, or all under-utilized
+ * datanodes do not have any remaining space.
+ *
+ *
Threshold is a float number in the range of (0, 100).
+ * The default value is 10.
+ * The smaller the threshold, the more balanced a cluster will be ended
+ * up with. But the risks are that it takes a longer time and it is more
+ * likely to be not able to converge when file writes and deletions are
+ * perfomed simutaneously.
+ *
+ * Another parameter that effects the balancer performance is the
+ * following configuration variable:
+ *
+ * dfs.balance.bandwidth
+ * 1*1024*1024L
+ *
+ * Specifies the maximum amount of bandwidth that each datanode
+ * can utilize for the balancing purpose in term of
+ * the number of bytes per second.
+ *
+ *
+ *
+ * The configuartion parameter plays a major role on deciding how much
+ * time to balance a cluster. To fill a machine with 1000GB disk to be
+ * 60% full, it takes at least 7.11 days. So it takes less to balance
+ * a cluster if we increase the bandwidth, but the cost is that
+ * cluster balancing effects more on the regular file I/O performance.
+ */
+
+public class Balancer implements Tool {
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
+ final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+
+ private Configuration conf;
+
+ private double threshold = 10D;
+ private NamenodeProtocol namenode;
+ private ClientProtocol client;
+
+ // all data node lists
+ private Collection overUtilizedDatanodes
+ = new LinkedList();
+ private Collection aboveAvgUtilizedDatanodes
+ = new LinkedList();
+ private Collection belowAvgUtilizedDatanodes
+ = new LinkedList();
+ private Collection underUtilizedDatanodes
+ = new LinkedList();
+
+ private Collection sources
+ = new HashSet();
+ private Collection targets
+ = new HashSet();
+
+ private Map blockList
+ = new HashMap();
+ private Map datanodes
+ = new HashMap();
+
+ private NetworkTopology cluster = new NetworkTopology();
+
+ private double avgUtilization = 0.0D;
+
+ /* This class keeps track of a scheduled block move */
+ private class PendingBlockMove {
+ private BalancerBlock block;
+ private Source source;
+ private BalancerDatanode proxySource;
+ private BalancerDatanode target;
+
+ /** constructor */
+ private PendingBlockMove() {
+ }
+
+ /* Return source */
+ private Source getSource() {
+ return source;
+ }
+
+ /* Return target */
+ private BalancerDatanode getTarget() {
+ return target;
+ }
+
+ /* Return target */
+ private BalancerBlock getBlock() {
+ return block;
+ }
+
+ /* choose a block & a proxy source for this pendingMove
+ * whose souce & target have already been chosen.
+ *
+ * Return true if a block and its proxy are chosen; false otherwise
+ */
+ private boolean chooseBlockAndProxy() {
+ // iterate all source's blocks until find a good one
+ for(Iterator blocks=
+ source.getBlockIterator(); blocks.hasNext();) {
+ if(markMovedIfGoodBlock(blocks.next())) {
+ blocks.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* Return true if the given block is good for the tentatvie move;
+ * If it is good, add it to the moved list to marked as "Moved".
+ * A block is good iff
+ * 1. it is a good candidate; see isGoodBlockCandidate
+ * 2. can find a proxy source that's not busy for this move
+ */
+ private boolean markMovedIfGoodBlock(BalancerBlock block) {
+ synchronized(block) {
+ synchronized(movedBlocks) {
+ if(isGoodBlockCandidate(source, target, block)) {
+ this.block = block;
+ if( chooseProxySource() ) {
+ addToMoved(block);
+ LOG.info("Decided to move block "+ block.getBlockId()
+ +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+ + " bytes from " + source.getName()
+ + " to " + target.getName()
+ + " using proxy source " + proxySource.getName() );
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /* Now we find out source, target, and block, we need to find a proxy
+ *
+ * @return true if a proxy is found; otherwise false
+ */
+ private boolean chooseProxySource() {
+ // check if there is replica which is on the same rack with the target
+ for(BalancerDatanode loc : block.getLocations()) {
+ if(cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
+ if(loc.addPendingBlock(this)) {
+ proxySource = loc;
+ return true;
+ }
+ }
+ }
+ // find out a non-busy replica
+ for(BalancerDatanode loc : block.getLocations()) {
+ if(loc.addPendingBlock(this)) {
+ proxySource = loc;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* Dispatch the block move task to the proxy source & wait for the response
+ */
+ private void dispatch() {
+ Socket sock = new Socket();
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ sock.connect(DataNode.createSocketAddr(
+ proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
+ //System.out.println("Dispatching block "+block.getBlockId());
+ out = new DataOutputStream( new BufferedOutputStream(
+ sock.getOutputStream(), FSConstants.BUFFER_SIZE));
+ long start = FSNamesystem.now();
+ sendRequest(out);
+ in = new DataInputStream( new BufferedInputStream(
+ sock.getInputStream(), FSConstants.BUFFER_SIZE));
+ receiveResponse(in);
+ bytesMoved.inc(block.getNumBytes());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving block "+block.getBlock().getBlockId()+
+ " from "+source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ " succeeded: " +
+ "taken " + (FSNamesystem.now()-start)+" ms");
+ }
+ } catch (IOException e) {
+ LOG.warn("Error moving block "+block.getBlockId()+
+ " from "+source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ ": "+e.getMessage()+ "\n" +
+ StringUtils.stringifyException(e) );
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(sock);
+
+ proxySource.removePendingBlock(this);
+ synchronized(target) {
+ target.removePendingBlock(this);
+ }
+
+ synchronized (this ) {
+ reset();
+ }
+ synchronized (Balancer.this) {
+ Balancer.this.notifyAll();
+ }
+ }
+ }
+
+ /* Send a block copy request to the outputstream*/
+ private void sendRequest(DataOutputStream out) throws IOException {
+ out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+ out.writeByte(FSConstants.OP_COPY_BLOCK);
+ out.writeLong(block.getBlock().getBlockId());
+ Text.writeString(out, source.getStorageID());
+ target.write(out);
+ out.flush();
+ }
+
+ /* Receive a block copy response from the input stream */
+ private void receiveResponse(DataInputStream in) throws IOException {
+ short status = in.readShort();
+ if (status != FSConstants.OP_STATUS_SUCCESS) {
+ throw new IOException("Moving block "+block.getBlockId()+
+ " from "+source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ "failed");
+ }
+ }
+
+ /* reset the object */
+ private void reset() {
+ block = null;
+ source = null;
+ proxySource = null;
+ target = null;
+ }
+
+ /* start a thread to dispatch the block move */
+ private void moveBlock() {
+ BlockMover blockMover = new BlockMover();
+ blockMover.setDaemon(true);
+ blockMover.start();
+ }
+
+ /* A thread for moving a block */
+ private class BlockMover extends Thread {
+ BlockMover() {
+ }
+
+ public void run() {
+ dispatch();
+ }
+ }
+ }
+
+ /* A class for keeping track of blocks in the Balancer */
+ private Map movedBlocks
+ = new HashMap();
+
+ static private class BalancerBlock {
+ private Block block; // the block
+ private List locations
+ = new ArrayList(3); // its locations
+
+ /* Constructor */
+ private BalancerBlock(Block block) {
+ this.block = block;
+ }
+
+ /* clean block locations */
+ private synchronized void clearLocations() {
+ locations.clear();
+ }
+
+ /* add a location */
+ private synchronized void addLocation(BalancerDatanode datanode) {
+ if(!locations.contains(datanode)) {
+ locations.add(datanode);
+ }
+ }
+
+ /* Return if the block is located on datanode */
+ private synchronized boolean isLocatedOnDatanode(BalancerDatanode datanode) {
+ return locations.contains(datanode);
+ }
+
+ /* Return its locations */
+ private synchronized List getLocations() {
+ return locations;
+ }
+
+ /* Return the block */
+ private Block getBlock() {
+ return block;
+ }
+
+ /* Return the block id */
+ private long getBlockId() {
+ return block.getBlockId();
+ }
+
+ /* Return the length of the block */
+ private long getNumBytes() {
+ return block.getNumBytes();
+ }
+ }
+
+ /* A class that represents a node to be involved in balancing
+ * and the total number of bytes to be moved from/to this node.
+ */
+ static private class NodeTask {
+ private BalancerDatanode datanode; //source or target node
+ private long size; //bytes scheduled to move
+
+ /* constructor */
+ private NodeTask(BalancerDatanode datanode, long size) {
+ this.datanode = datanode;
+ this.size = size;
+ }
+
+ /* Get the node */
+ private BalancerDatanode getDatanode() {
+ return datanode;
+ }
+
+ /* Get the number of bytes that need to be moved */
+ private long getSize() {
+ return size;
+ }
+ }
+
+ /* Return the utilization of a datanode */
+ static private double getUtilization(DatanodeInfo datanode) {
+ return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
+ }
+
+ /* A class that keeps track of a datanode in Balancer */
+ private static class BalancerDatanode implements Writable {
+ final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+ final protected static short MAX_NUM_CONCURRENT_MOVES =
+ DataNode.MAX_BALANCING_THREADS;
+ protected DatanodeInfo datanode;
+ private double utilization;
+ protected long sizeToMove;
+ protected long scheduledSize = 0L;
+ // blocks being moved but not confirmed yet
+ private List pendingBlocks =
+ new ArrayList(MAX_NUM_CONCURRENT_MOVES);
+
+ /* Constructor
+ * Depending on avgutil & threshold, decide how many bytes to move
+ * for the next iteration
+ */
+ private BalancerDatanode(
+ DatanodeInfo node, double avgUtil, double threshold) {
+ datanode = node;
+ utilization = Balancer.getUtilization(node);
+
+ if(utilization >= avgUtil+threshold
+ || utilization <= avgUtil-threshold) {
+ sizeToMove = (long)(threshold*datanode.getCapacity()/100);
+ } else {
+ sizeToMove =
+ (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
+ }
+ if(this.utilization < avgUtil ) {
+ long remaining = datanode.getRemaining();
+ sizeToMove = (remaining nodeTasks = new ArrayList(2);
+ private long blocksToReceive = 0L;
+ private List blocks
+ = new ArrayList();
+
+ /* constructor */
+ private Source(DatanodeInfo node, double avgUtil, double threshold) {
+ super(node, avgUtil, threshold);
+ }
+
+ /** Add a node task */
+ private void addNodeTask(NodeTask task) {
+ incScheduledSize(task.getSize());
+ nodeTasks.add(task);
+ }
+
+ /* Return an iterator to this source's blocks */
+ private Iterator getBlockIterator() {
+ return blocks.iterator();
+ }
+
+ /* fetch new blocks of this source from namenode and
+ * update this source's block list & the global block list
+ */
+ private void getBlockList() throws IOException {
+ BlockWithLocations[] newBlocks = namenode.getBlocks(datanode,
+ (long)Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+ for(BlockWithLocations blk : newBlocks) {
+ blocksToReceive -= blk.getBlock().getNumBytes();
+ BalancerBlock block;
+ synchronized(blockList) {
+ block = blockList.get(blk.getBlock());
+ if(block==null) {
+ block = new BalancerBlock(blk.getBlock());
+ blockList.put(blk.getBlock(), block);
+ } else {
+ block.clearLocations();
+ }
+
+ synchronized (block) {
+ // update locations
+ for ( String location : blk.getDatanodes() ) {
+ BalancerDatanode datanode = datanodes.get(location);
+ if (datanode != null) { // not an unknown datanode
+ block.addLocation(datanode);
+ }
+ }
+ }
+ if(!blocks.contains(block) && isGoodBlockCandidate(block)) {
+ // filter bad candidates
+ blocks.add(block);
+ }
+ }
+ }
+ }
+
+ /* Decide if the given block is a good candiate to move or not */
+ private boolean isGoodBlockCandidate(BalancerBlock block) {
+ for(NodeTask nodeTask : nodeTasks) {
+ if(Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* Return a block that's good for the source thread to dispatch immediately
+ * The block's source, target, and proxy source are determined too.
+ * When choosing proxy and target, source & target throttling
+ * has been considered. They are choosen only when they have the capacity
+ * to support this block move.
+ * The block should be disptched immediately after this method is returned.
+ */
+ private PendingBlockMove chooseNextBlockToMove() {
+ for( Iterator tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
+ NodeTask task = tasks.next();
+ BalancerDatanode target = task.getDatanode();
+ PendingBlockMove pendingBlock = new PendingBlockMove();
+ if( target.addPendingBlock(pendingBlock) ) {
+ // target is not busy, so do a tentative block allocation
+ pendingBlock.source = this;
+ pendingBlock.target = target;
+ if( pendingBlock.chooseBlockAndProxy() ) {
+ long blockSize = pendingBlock.block.getNumBytes();
+ scheduledSize -= blockSize;
+ task.size -= blockSize;
+ if(task.size == 0) {
+ tasks.remove();
+ }
+ return pendingBlock;
+ } else {
+ // cancel the tentative move
+ target.removePendingBlock(pendingBlock);
+ }
+ }
+ }
+ return null;
+ }
+
+ /* iterate all source's blocks to remove moved ones */
+ private void filterMovedBlocks() {
+ for(Iterator blocks=getBlockIterator();
+ blocks.hasNext();) {
+ if(isMoved(blocks.next())) {
+ blocks.remove();
+ }
+ }
+ }
+
+ /* Return if should fetch more blocks from namenode */
+ private boolean shouldFetchMoreBlocks() {
+ return blocks.size()<5 && blocksToReceive>0;
+ }
+
+ /* This method iteratively does the following:
+ * it first selects a block to move,
+ * then sends a request to the proxy source to start the block move
+ * when the source's block list falls below a threshold, it asks
+ * the namenode for more blocks.
+ * It terminates when it has dispatch enough block move tasks or
+ * it has received enough blocks from the namenode
+ */
+ private void dispatchBlocks() {
+ // the number of consecutive iterations without block move
+ this.blocksToReceive = 2*scheduledSize;
+ while(scheduledSize>0 && (!blocks.isEmpty() || blocksToReceive>0)) {
+ PendingBlockMove pendingBlock = chooseNextBlockToMove();
+ if(pendingBlock != null){
+ // move the block
+ pendingBlock.moveBlock();
+ } else { // no blocks to choose from
+ filterMovedBlocks(); // filter already moved blocks
+ if (shouldFetchMoreBlocks()) {// fetch new blocks
+ try {
+ getBlockList();
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ return;
+ }
+ } else { // wait for targets to be idle
+ try {
+ synchronized(Balancer.this) {
+ Balancer.this.wait(1000);
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /** Default constructor */
+ Balancer() {
+ }
+
+ /** Construct a balancer from the given configuration */
+ Balancer(Configuration conf) {
+ setConf(conf);
+ }
+
+ /** Construct a balancer from the given configuration and threshold */
+ Balancer(Configuration conf, double threshold) {
+ setConf(conf);
+ this.threshold = threshold;
+ }
+
+ /**
+ * Run a balancer
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+ System.exit( ToolRunner.run(null, new Balancer(), args) );
+ } catch (Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: java Balancer");
+ System.out.println(" [-threshold ]\t"
+ +"percentage of disk capacity");
+ }
+
+ /* parse argument to get the threshold */
+ private double parseArgs(String[] args) {
+ double threshold=0;
+ int argsLen = (args == null) ? 0 : args.length;
+ if(argsLen==0) {
+ threshold = 10;
+ } else {
+ if(argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
+ printUsage();
+ System.exit(-1);
+ } else {
+ try {
+ threshold = Double.parseDouble(args[1]);
+ if(threshold < 0) {
+ throw new NumberFormatException();
+ }
+ LOG.info( "Using a threshold of " + threshold );
+ } catch(NumberFormatException e) {
+ System.err.println(
+ "Expect a double parameter in the range of [0, 100]: "+ args[1]);
+ printUsage();
+ System.exit(-1);
+ }
+ }
+ }
+ return threshold;
+ }
+
+ /* Initialize balancer */
+ private void init(double threshold) throws IOException {
+ this.threshold = threshold;
+ // get name node address
+ InetSocketAddress nameNodeAddr = DataNode.createSocketAddr(
+ conf.get("fs.default.name", "local"));
+ // connect to name node
+ this.namenode = createNamenode(nameNodeAddr, conf);
+ this.client = DFSClient.createNamenode(nameNodeAddr, conf);
+ }
+
+ /* Build a NamenodeProtocol connection to the namenode and
+ * set up the retry policy */
+ private static NamenodeProtocol createNamenode(
+ InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+ 5, 200, TimeUnit.MILLISECONDS);
+ Map,RetryPolicy> exceptionToPolicyMap =
+ new HashMap, RetryPolicy>();
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ timeoutPolicy, exceptionToPolicyMap);
+ Map methodNameToPolicyMap =
+ new HashMap();
+ methodNameToPolicyMap.put("getBlocks", methodPolicy);
+
+ return (NamenodeProtocol) RetryProxy.create(
+ NamenodeProtocol.class,
+ RPC.waitForProxy(NamenodeProtocol.class,
+ NamenodeProtocol.versionID,
+ nameNodeAddr,
+ conf),
+ methodNameToPolicyMap);
+
+ }
+
+ /* get all live datanodes of a cluster and their disk usage
+ * decide the number of bytes need to be moved
+ */
+ private long initNodes() throws IOException {
+ return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
+ }
+
+ /* Given data node set, build a network topology, decide
+ * overutilized datanodes, above average utilized datanodes,
+ * below average utilized datanodes, and underutilized datanodes.
+ * @return the total number of bytes that are
+ * needed to move to make the cluster balanced.
+ * @param datanodes a set of datanodes
+ */
+ private long initNodes(DatanodeInfo[] datanodes) {
+ // compute average utilization
+ long totalCapacity=0L, totalUsedSpace=0L;
+ for(DatanodeInfo datanode : datanodes) {
+ totalCapacity += datanode.getCapacity();
+ totalUsedSpace += datanode.getDfsUsed();
+ }
+ this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+
+ /*create network topology and all data node lists:
+ * overloaded, above-average, below-average, and underloaded
+ */
+ long overLoadedBytes = 0L, underLoadedBytes = 0L;
+ for(DatanodeInfo datanode : datanodes) {
+ cluster.add(datanode);
+ BalancerDatanode datanodeS;
+ if(getUtilization(datanode) > avgUtilization) {
+ datanodeS = new Source(datanode, avgUtilization, threshold);
+ if( isAboveAvgUtilized(datanodeS)) {
+ this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
+ } else {
+ assert(isOverUtilized(datanodeS)) :
+ datanodeS.getName()+ "is not an overUtilized node";
+ this.overUtilizedDatanodes.add((Source)datanodeS);
+ overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+ -threshold)*datanodeS.datanode.getCapacity()/100.0);
+ }
+ } else {
+ datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+ if( isBelowAvgUtilized(datanodeS)) {
+ this.belowAvgUtilizedDatanodes.add(datanodeS);
+ } else {
+ assert (isUnderUtilized(datanodeS)) :
+ datanodeS.getName()+ "is not an underUtilized node";
+ this.underUtilizedDatanodes.add(datanodeS);
+ underLoadedBytes += (long)((avgUtilization-threshold-
+ datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+ }
+ }
+ this.datanodes.put(datanode.getStorageID(), datanodeS);
+ }
+
+ //logging
+ logImbalancedNodes();
+
+ assert (this.datanodes.size() ==
+ overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+ aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
+ : "Mismatched number of datanodes";
+
+ // return number of bytes to be moved in order to make the cluster balanced
+ return Math.max(overLoadedBytes, underLoadedBytes);
+ }
+
+ private void logImbalancedNodes() {
+ StringBuilder msg = new StringBuilder();
+ msg.append(overUtilizedDatanodes.size());
+ msg.append(" over utilized nodes:");
+ for(Source node : overUtilizedDatanodes) {
+ msg.append( " " );
+ msg.append( node.getName() );
+ }
+ LOG.info(msg);
+ msg = new StringBuilder();
+ msg.append(underUtilizedDatanodes.size());
+ msg.append(" under utilized nodes: ");
+ for(BalancerDatanode node : underUtilizedDatanodes) {
+ msg.append( " " );
+ msg.append( node.getName() );
+ }
+ LOG.info(msg);
+ }
+
+ /* Decide all pairs and
+ * the number of bytes to move from a source to a target
+ * Maximum bytes to be moved per node is
+ * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
+ * Return total number of bytes to move in this iteration
+ */
+ private long chooseNodes() {
+ // Match nodes on the same rack first
+ chooseNodes(true);
+ // Then match nodes on differrent racks
+ chooseNodes(false);
+
+ assert (datanodes.size() ==
+ overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+ aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+
+ sources.size()+targets.size())
+ : "Mismatched number of datanodes";
+
+ long bytesToMove = 0L;
+ for(Source src : sources) {
+ bytesToMove += src.scheduledSize;
+ }
+ return bytesToMove;
+ }
+
+ /* if onRack is true, decide all pairs
+ * where source and target are on the same rack; Otherwise
+ * decide all pairs where source and target are
+ * on different racks
+ */
+ private void chooseNodes(boolean onRack) {
+ /* first step: match each overUtilized datanode to
+ * one or more underUtilized datanodes
+ */
+ chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+
+ /* match each remaining overutilized datanode to
+ * below average utlized datanodes
+ * if there is any overutilized datanode left unmatched in the first step
+ */
+ chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+
+ /* match each remaining underutilized datanode to
+ * above average utlized datanodes
+ * if there is any underutilized datanode left unmatched in the first step
+ */
+ chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+ }
+
+ /*
+ private long chooseNodes() {
+ // matching nodes in the following priority:
+ // 1. on rack over/under utilized nodes
+ chooseNodes(NodeType.OnRackOffBand);
+ // 2. on rack above/below average utilized nodes
+ chooseNodes(NodeType.OnRackInBand);
+ // 3. off rack over/under utilized nodes
+ chooseNodes(NodeType.OffRackOffBand);
+ // 4. off rack above/below average utilized nodes
+ chooseNodes(NodeType.OffRackInBand);
+
+ assert (datanodes.size() ==
+ overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+ aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+
+ sources.size()+targets.size())
+ : "Mismatched number of datanodes";
+
+ long bytesToMove = 0L;
+ for(Source src : sources) {
+ bytesToMove += src.scheduledSize;
+ }
+ return bytesToMove;
+ }
+ */
+ /* Decide all pairs and
+ * the number of bytes to move from a source to a target for each node type
+ */
+ /*
+ private boolean chooseNodes(NodeType nodeType) {
+ // first choose targets for all overutilized nodes
+ boolean nodeChosen = chooseTargets(nodeType);
+ // then choose sources for remaining underutilized nodes
+ return chooseSources(nodeType) || nodeChosen;
+ }
+ */
+
+ /* choose targets from the target candidate list for each over utilized
+ * datanode. OnRackTarget determines if the chosen target
+ * should be on the same rack as the source
+ */
+ private void chooseTargets(
+ Iterator targetCandidates, boolean onRackTarget ) {
+ for(Iterator iterator = overUtilizedDatanodes.iterator();
+ iterator.hasNext();) {
+ Source source = iterator.next();
+ while( chooseTarget(source, targetCandidates, onRackTarget) ) {
+ }
+ if(!source.isMoveQuotaFull()) {
+ iterator.remove();
+ }
+ }
+ return;
+ }
+
+ /* choose sources from the source candidate list for each under utilized
+ * datanode. onRackSource determines if the chosen source
+ * should be on the same rack as the target
+ */
+ private void chooseSources(
+ Iterator sourceCandidates, boolean onRackSource) {
+ for(Iterator iterator =
+ underUtilizedDatanodes.iterator(); iterator.hasNext();) {
+ BalancerDatanode target = iterator.next();
+ while( chooseSource(target, sourceCandidates, onRackSource) ) {
+ }
+ if(!target.isMoveQuotaFull()) {
+ iterator.remove();
+ }
+ }
+ return;
+ }
+
+ /* For the given source, choose targets from the target candidate list.
+ * OnRackTarget determines if the chosen target
+ * should be on the same rack as the source
+ */
+ private boolean chooseTarget(Source source,
+ Iterator targetCandidates, boolean onRackTarget) {
+ if(!source.isMoveQuotaFull()) {
+ return false;
+ }
+ boolean foundTarget = false;
+ BalancerDatanode target = null;
+ while(!foundTarget && targetCandidates.hasNext()) {
+ target = targetCandidates.next();
+ if( !target.isMoveQuotaFull()) {
+ targetCandidates.remove();
+ continue;
+ }
+ if( onRackTarget ) {
+ // choose from on-rack nodes
+ if( cluster.isOnSameRack(source.datanode, target.datanode)) {
+ foundTarget = true;
+ }
+ } else {
+ // choose from off-rack nodes
+ if( !cluster.isOnSameRack(source.datanode, target.datanode)) {
+ foundTarget = true;
+ }
+ }
+ }
+ if(foundTarget) {
+ assert(target != null):"Choose a null target";
+ long size = Math.min(source.availableSizeToMove(),
+ target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ if( !target.isMoveQuotaFull()) {
+ targetCandidates.remove();
+ }
+ LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ return true;
+ }
+ return false;
+ }
+
+ /* For the given target, choose sources from the source candidate list.
+ * OnRackSource determines if the chosen source
+ * should be on the same rack as the target
+ */
+ private boolean chooseSource(BalancerDatanode target,
+ Iterator sourceCandidates, boolean onRackSource) {
+ if(!target.isMoveQuotaFull()) {
+ return false;
+ }
+ boolean foundSource = false;
+ Source source = null;
+ while(!foundSource && sourceCandidates.hasNext()) {
+ source = sourceCandidates.next();
+ if( !source.isMoveQuotaFull()) {
+ sourceCandidates.remove();
+ continue;
+ }
+ if(onRackSource) {
+ // choose from on-rack nodes
+ if( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
+ foundSource = true;
+ }
+ } else {
+ // choose from off-rack nodes
+ if( !cluster.isOnSameRack(source.datanode, target.datanode)) {
+ foundSource = true;
+ }
+ }
+ }
+ if(foundSource) {
+ assert(target != null):"Choose a null target";
+ long size = Math.min(source.availableSizeToMove(),
+ target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ if( !source.isMoveQuotaFull()) {
+ sourceCandidates.remove();
+ }
+ LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ return true;
+ }
+ return false;
+ }
+
+ private static class BytesMoved {
+ private long bytesMoved = 0L;;
+ private synchronized void inc( long bytes ) {
+ bytesMoved += bytes;
+ }
+
+ private long get() {
+ return bytesMoved;
+ }
+ };
+ private BytesMoved bytesMoved = new BytesMoved();
+ private int notChangedIterations = 0;
+
+ /* Start a thread to dispatch block moves for each source.
+ * The thread selects blocks to move & sends request to proxy source to
+ * initiate block move. The process is flow controled. Block selection is
+ * blocked if there are too many un-confirmed block moves.
+ * Return the total number of bytes successfully moved in this iteration.
+ */
+ private long dispatchBlockMoves() {
+ long bytesLastMoved = bytesMoved.get();
+ Source.Dispatcher dispatchers[] = new Source.Dispatcher[sources.size()];
+ int i=0;
+ for(Source source : sources) {
+ dispatchers[i] = source.new Dispatcher();
+ dispatchers[i].setName("Dispatcher for source " + source.getName());
+ dispatchers[i++].start();
+ }
+ for(Source.Dispatcher dispatcher : dispatchers) {
+ try {
+ dispatcher.join();
+ } catch (InterruptedException e) {
+ LOG.info(StringUtils.stringifyException(e));
+ }
+ }
+ waitForMoveCompletion();
+ return bytesMoved.get()-bytesLastMoved;
+ }
+
+ /* waite for all block move confirmations
+ * by checking each target's pendingMove queue
+ */
+ private void waitForMoveCompletion() {
+ boolean shouldWait;
+ do {
+ shouldWait = false;
+ for(BalancerDatanode target : targets) {
+ if(!target.isMoveCompleted()) {
+ shouldWait = true;
+ }
+ }
+ if(shouldWait) {
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } while (shouldWait);
+ }
+
+ /* mark a block to be moved */
+ private void addToMoved(BalancerBlock block) {
+ synchronized(movedBlocks) {
+ movedBlocks.put(block.getBlock(), block);
+ }
+ }
+
+ /* check if a block is marked as moved */
+ private boolean isMoved(BalancerBlock block) {
+ synchronized(movedBlocks) {
+ return movedBlocks.containsKey(block.getBlock());
+ }
+ }
+
+ /* Decide if it is ok to move the given block from source to target
+ * A block is a good candiate if
+ * 1. the block is not in the process of being moved/has not been moved;
+ * 2. the block does not have a replica on the target;
+ * 3. doing the move does not reduce the number of racks that the block has
+ */
+ private boolean isGoodBlockCandidate(Source source,
+ BalancerDatanode target, BalancerBlock block) {
+ // check if the block is moved or not
+ if(isMoved(block)) {
+ return false;
+ }
+ if(block.isLocatedOnDatanode(target)) {
+ return false;
+ }
+
+ boolean goodBlock = false;
+ if(cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
+ // good if source and target are on the same rack
+ goodBlock = true;
+ } else {
+ boolean notOnSameRack = true;
+ synchronized (block) {
+ for(BalancerDatanode loc : block.locations) {
+ if(cluster.isOnSameRack(loc.datanode, target.datanode)) {
+ notOnSameRack = false;
+ break;
+ }
+ }
+ }
+ if(notOnSameRack) {
+ // good if target is target is not on the same rack as any replica
+ goodBlock = true;
+ } else {
+ // good if source is on the same rack as on of the replicas
+ for(BalancerDatanode loc : block.locations) {
+ if(loc != source &&
+ cluster.isOnSameRack(loc.datanode, source.datanode)) {
+ goodBlock = true;
+ break;
+ }
+ }
+ }
+ }
+ return goodBlock;
+ }
+
+ /* reset all fields in a balancer preparing for the next iteration */
+ private void resetData() {
+ this.cluster = new NetworkTopology();
+ this.overUtilizedDatanodes.clear();
+ this.aboveAvgUtilizedDatanodes.clear();
+ this.belowAvgUtilizedDatanodes.clear();
+ this.underUtilizedDatanodes.clear();
+ this.datanodes.clear();
+ this.sources.clear();
+ this.targets.clear();
+ this.avgUtilization = 0.0D;
+ this.movedBlocks.clear();
+ this.blockList.clear();
+ }
+
+ private boolean isOverUtilized(BalancerDatanode datanode) {
+ return datanode.utilization > (avgUtilization+threshold);
+ }
+
+ private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
+ return (datanode.utilization <= (avgUtilization+threshold))
+ && (datanode.utilization > avgUtilization);
+ }
+
+ private boolean isUnderUtilized(BalancerDatanode datanode) {
+ return datanode.utilization < (avgUtilization-threshold);
+ }
+
+ private boolean isBelowAvgUtilized(BalancerDatanode datanode) {
+ return (datanode.utilization >= (avgUtilization-threshold))
+ && (datanode.utilization < avgUtilization);
+ }
+
+ /** main method of Balancer
+ * @param args arguments to a Balancer
+ * @exception any exception occurs during datanode balancing
+ */
+ public int run(String[] args) throws Exception {
+ long startTime = FSNamesystem.now();
+ try {
+ // initialize a blancer
+ init(parseArgs(args));
+
+ Formatter formatter = new Formatter(System.out);
+ System.out.println("Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
+ int iterations = 0;
+ while (true ) {
+ /* get all live datanodes of a cluster and their disk usage
+ * decide the number of bytes need to be moved
+ */
+ long bytesLeftToMove = initNodes();
+ if(bytesLeftToMove == 0) {
+ System.out.println("The cluster is balanced. Exiting...");
+ return 1;
+ } else {
+ LOG.info( "Need to move "+ FsShell.byteDesc(bytesLeftToMove)
+ +" bytes to make the cluster balanced." );
+ }
+
+ /* Decide all the nodes that will participate in the block move and
+ * the number of bytes that need to be moved from one node to another
+ * in this iteration. Maximum bytes to be moved per node is
+ * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
+ */
+ long bytesToMove = chooseNodes();
+ if(bytesToMove == 0) {
+ System.out.println("No block can be moved. Exiting...");
+ return 1;
+ } else {
+ LOG.info( "Will move " + FsShell.byteDesc(bytesToMove) +
+ "bytes in this iteration");
+ }
+
+ formatter.format("%-10d %19s %18s %17s\n", iterations,
+ FsShell.byteDesc(bytesMoved.get()),
+ FsShell.byteDesc(bytesLeftToMove),
+ FsShell.byteDesc(bytesToMove));
+
+ /* For each pair of , start a thread that repeatedly
+ * decide a block to be moved and its proxy source,
+ * then initiates the move util all bytes are moved or no more block
+ * available to move.
+ * Exit no byte has been moved for 3 consectutive iterations.
+ */
+ if (dispatchBlockMoves() > 0) {
+ notChangedIterations = 0;
+ } else {
+ notChangedIterations++;
+ if(notChangedIterations >= 3) {
+ System.out.println(
+ "No block has been moved for 3 iterations. Exiting...");
+ return 1;
+ }
+ }
+
+ // clean all lists
+ resetData();
+
+ try {
+ Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
+ } catch (InterruptedException ignored) {
+ }
+
+ iterations++;
+ }
+ } catch (IOException e) {
+ return -1;
+ } finally {
+ System.out.println("Balancing took " +
+ time2Str(FSNamesystem.now()-startTime));
+ }
+ }
+
+ /* Given elaspedTime in ms, return a printable string */
+ private static String time2Str(long elapsedTime) {
+ String unit;
+ double time = elapsedTime;
+ if (elapsedTime < 1000 ) {
+ unit = "milliseconds";
+ } else if (elapsedTime < 60*1000) {
+ unit = "seconds";
+ time = time/1000;
+ } else if (elapsedTime < 3600*1000) {
+ unit = "minutes";
+ time = time/(60*1000);
+ } else {
+ unit = "hours";
+ time = time/(3600*1000);
+ }
+
+ return time+" "+unit;
+ }
+
+ /** return this balancer's configuration */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /** set this balancer's configuration */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+}
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java (revision 593614)
+++ src/java/org/apache/hadoop/dfs/DataNode.java (working copy)
@@ -131,10 +131,10 @@
int defaultBytesPerChecksum = 512;
// The following three fields are to support balancing
- final private static long BALANCE_BANDWIDTH = 1024L*1024; // 1MB/s
- final private static short MAX_BALANCING_THREADS = 5;
+ final static short MAX_BALANCING_THREADS = 5;
private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
- private Throttler balancingThrottler = new Throttler(BALANCE_BANDWIDTH);
+ long balanceBandwidth;
+ private Throttler balancingThrottler;
private static class DataNodeMetrics implements Updater {
private final MetricsRecord metricsRecord;
@@ -283,6 +283,11 @@
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
DataNode.nameNodeAddr = nameNodeAddr;
+ //set up parameter for cluster balancing
+ this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+ LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
+ this.balancingThrottler = new Throttler(balanceBandwidth);
+
//create a servlet to serve full-file content
int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
@@ -547,8 +552,10 @@
synchronized(receivedBlockList) {
synchronized(delHints) {
int numBlocks = receivedBlockList.size();
- if (receivedBlockList.size() > 0) {
- assert(numBlocks==delHints.size());
+ if (numBlocks > 0) {
+ if(numBlocks!=delHints.size()) {
+ LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
+ }
//
// Send newly-received blockids to namenode
//
@@ -558,6 +565,9 @@
}
}
if (blockArray != null) {
+ if(delHintArray == null || delHintArray.length != blockArray.length ) {
+ LOG.warn("Panic: block array & delHintArray are not the same" );
+ }
namenode.blockReceived(dnRegistration, blockArray, delHintArray);
synchronized (receivedBlockList) {
synchronized (delHints) {
@@ -747,6 +757,9 @@
* client? For now we don't.
*/
private void notifyNamenodeReceivedBlock(Block block, String delHint) {
+ if(block==null || delHint==null) {
+ throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
+ }
synchronized (receivedBlockList) {
synchronized (delHints) {
receivedBlockList.add(block);
@@ -1143,7 +1156,7 @@
// notify name node
notifyNamenodeReceivedBlock(block, sourceID);
- LOG.info("Received block " + block +
+ LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
opStatus = OP_STATUS_ERROR;
@@ -1175,7 +1188,7 @@
/** Constructor */
Throttler(long bandwidthPerSec) {
- this(1000, bandwidthPerSec); // by default throttling period is 1s
+ this(500, bandwidthPerSec); // by default throttling period is 500ms
}
/** Constructor */
Index: bin/hadoop
===================================================================
--- bin/hadoop (revision 593614)
+++ bin/hadoop (working copy)
@@ -39,6 +39,7 @@
echo " dfsadmin run a DFS admin client"
echo " fsck run a DFS filesystem checking utility"
echo " fs run a generic filesystem user client"
+ echo " balancer run a cluster balancing utility"
echo " jobtracker run the MapReduce job Tracker node"
echo " pipes run a Pipes job"
echo " tasktracker run a MapReduce task Tracker node"
@@ -177,6 +178,8 @@
CLASS=org.apache.hadoop.dfs.DFSAdmin
elif [ "$COMMAND" = "fsck" ] ; then
CLASS=org.apache.hadoop.dfs.DFSck
+elif [ "$COMMAND" = "balancer" ] ; then
+ CLASS=org.apache.hadoop.dfs.Balancer
elif [ "$COMMAND" = "jobtracker" ] ; then
CLASS=org.apache.hadoop.mapred.JobTracker
elif [ "$COMMAND" = "tasktracker" ] ; then