Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml (revision 502789)
+++ conf/hadoop-default.xml (working copy)
@@ -346,6 +346,14 @@
+
+ dfs.network.script
+
+
+ Specifies a script name that print the network location path
+ of the current machine.
+
+
Index: src/test/org/apache/hadoop/net/TestNetworkTopology.java
===================================================================
--- src/test/org/apache/hadoop/net/TestNetworkTopology.java (revision 0)
+++ src/test/org/apache/hadoop/net/TestNetworkTopology.java (revision 0)
@@ -0,0 +1,78 @@
+package org.apache.hadoop.net;
+
+import java.util.HashSet;
+import org.apache.hadoop.dfs.DatanodeDescriptor;
+import org.apache.hadoop.dfs.DatanodeID;
+import junit.framework.TestCase;
+
+public class TestNetworkTopology extends TestCase {
+ private NetworkTopology cluster = new NetworkTopology();
+ private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+ new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
+ new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d1/r2"),
+ new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3"),
+ new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r3")
+ };
+ private final static DatanodeDescriptor NODE =
+ new DatanodeDescriptor(new DatanodeID("h8:5020", "0", -1), "/d2/r4");
+
+ public TestNetworkTopology() {
+ for(int i=0; i set1 =
+ new HashSet(leaves.length);
+ HashSet set2 =
+ new HashSet(dataNodes.length);
+ for(int i=0; inumOfReplicas is 2, the 1st is on
+ * dataNodes[0] and the 2nd is on a different rack.
+ * @throws Exception
+ */
+ public void testChooseTarget1() throws Exception {
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[0]);
+
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(
+ 3, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[0]);
+ assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+
+ targets = replicator.chooseTarget(
+ 4, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[0]);
+ assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[3]));
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but the dataNodes[1] is
+ * not allowed to be choosen. So the 1st replica should be
+ * placed on dataNodes[0], the 2nd replica should be placed on a different
+ * rack, the 3rd should the same rack as the 3nd replic, and the rest
+ * should be placed on a third rack.
+ * @throws Exception
+ */
+ public void testChooseTarget2() throws Exception {
+ List excludedNodes;
+ DatanodeDescriptor[] targets;
+
+ excludedNodes = new ArrayList();
+ excludedNodes.add(dataNodes[1]);
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ excludedNodes = new ArrayList();
+ excludedNodes.add(dataNodes[1]);
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[0]);
+
+ excludedNodes = new ArrayList();
+ excludedNodes.add(dataNodes[1]);
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ excludedNodes = new ArrayList();
+ excludedNodes.add(dataNodes[1]);
+ targets = replicator.chooseTarget(
+ 3, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+
+ excludedNodes = new ArrayList();
+ excludedNodes.add(dataNodes[1]);
+ targets = replicator.chooseTarget(
+ 4, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[0]);
+ for(int i=1; i<4; i++) {
+ assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+ }
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+ assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
+ * to be choosen. So the 1st replica should be placed on dataNodes[1],
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 2nd replica,
+ * and the rest should be placed on the third rack.
+ * @throws Exception
+ */
+ public void testChooseTarget3() throws Exception {
+ // make data node 0 to be not qualified to choose
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[1]);
+
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[1]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(
+ 3, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[1]);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(
+ 4, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[1]);
+ for(int i=1; i<4; i++) {
+ assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+ }
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+ assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
+ * is qualified to be choosen. So the 1st replica should be placed on either
+ * rack 2 or rack 3.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 1st replica,
+ * @throws Exception
+ */
+ public void testChoooseTarget4() throws Exception {
+ // make data node 0 & 1 to be not qualified to choose: not enough disk space
+ for(int i=0; i<2; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE,
+ (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+ }
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(
+ 3, dataNodes[0], null, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ for(int i=0; i<3; i++) {
+ assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+ }
+ assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
+ cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+
+ for(int i=0; i<2; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+ }
+ /**
+ * In this testcase, client is is a node outside of file system.
+ * So the 1st replica can be placed on any node.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 1st replica,
+ * @throws Exception
+ */
+ public void testChooseTarget5() throws Exception {
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(
+ 0, NODE, null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, NODE, null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+
+ targets = replicator.chooseTarget(
+ 2, NODE, null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(
+ 3, NODE, null, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ }
+
+ /**
+ * This testcase tests re-replication, when dataNodes[0] is already choosen.
+ * So the 1st replica can be placed on rack 1.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica can be placed randomly,
+ * @throws Exception
+ */
+ public void testRereplicate1() throws Exception {
+ List choosenNodes = new ArrayList();
+ choosenNodes.add(dataNodes[0]);
+ DatanodeDescriptor[] targets;
+
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+
+ targets = replicator.chooseTarget(
+ 3, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[2]));
+ }
+
+ /**
+ * This testcase tests re-replication,
+ * when dataNodes[0] and dataNodes[1] are already choosen.
+ * So the 1st replica should be placed on a different rack than rack 1.
+ * the rest replicas can be placed randomly,
+ * @throws Exception
+ */
+ public void testRereplicate2() throws Exception {
+ List choosenNodes = new ArrayList();
+ choosenNodes.add(dataNodes[0]);
+ choosenNodes.add(dataNodes[1]);
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+ }
+
+ /**
+ * This testcase tests re-replication,
+ * when dataNodes[0] and dataNodes[2] are already choosen.
+ * So the 1st replica should be placed on rack 1.
+ * the rest replicas can be placed randomly,
+ * @throws Exception
+ */
+ public void testRereplicate3() throws Exception {
+ List choosenNodes = new ArrayList();
+ choosenNodes.add(dataNodes[0]);
+ choosenNodes.add(dataNodes[2]);
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(
+ 0, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(
+ 1, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(
+ 2, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+ }
+}
Index: src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
===================================================================
--- src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (revision 502789)
+++ src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (working copy)
@@ -20,6 +20,7 @@
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.net.NetworkTopology;
/**
* This class creates a single-process DFS cluster for junit testing.
@@ -95,9 +96,14 @@
this.conf.set("dfs.data.dir",
new File(dataDir, "data"+(2*index+1)).getPath()+","+
new File(dataDir, "data"+(2*index+2)).getPath());
+ }
+ public DataNodeRunner(Configuration conf, File dataDir,
+ String networkLoc, int index) {
+ this(conf, dataDir, index);
+ this.conf.set("dfs.datanode.rack", networkLoc);
}
-
+
/**
* Create and run the data node.
*/
@@ -115,7 +121,8 @@
}
}
}
- node = new DataNode(conf, dirs);
+ node = new DataNode(conf, conf.get("dfs.datanode.rack",
+ NetworkTopology.DEFAULT_RACK), dirs);
node.run();
} catch (Throwable e) {
node = null;
@@ -144,7 +151,7 @@
public MiniDFSCluster(int namenodePort,
Configuration conf,
boolean dataNodeFirst) throws IOException {
- this(namenodePort, conf, 1, dataNodeFirst, true);
+ this(namenodePort, conf, 1, dataNodeFirst, true, null);
}
/**
@@ -159,7 +166,7 @@
Configuration conf,
int nDatanodes,
boolean dataNodeFirst) throws IOException {
- this(namenodePort, conf, nDatanodes, dataNodeFirst, true);
+ this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
}
/**
@@ -175,7 +182,26 @@
Configuration conf,
int nDatanodes,
boolean dataNodeFirst,
- boolean formatNamenode) throws IOException {
+ boolean formatNamenode ) throws IOException {
+ this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
+ }
+
+ /**
+ * Create the config and start up the servers. If either the rpc or info port is already
+ * in use, we will try new ports.
+ * @param namenodePort suggestion for which rpc port to use. caller should use
+ * getNameNodePort() to get the actual port used.
+ * @param nDatanodes Number of datanodes
+ * @param dataNodeFirst should the datanode be brought up before the namenode?
+ * @param formatNamenode should the namenode be formatted before starting up ?
+ * @param racks array of strings indicating racks that each datanode is on
+ */
+ public MiniDFSCluster(int namenodePort,
+ Configuration conf,
+ int nDatanodes,
+ boolean dataNodeFirst,
+ boolean formatNamenode,
+ String[] racks) throws IOException {
this.conf = conf;
@@ -208,7 +234,11 @@
dataNodes = new DataNodeRunner[nDatanodes];
dataNodeThreads = new Thread[nDatanodes];
for (int idx = 0; idx < nDatanodes; idx++) {
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
+ if( racks == null || idx >= racks.length) {
+ dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
+ } else {
+ dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
+ }
dataNodeThreads[idx] = new Thread(dataNodes[idx]);
}
if (dataNodeFirst) {
Index: src/java/org/apache/hadoop/net/NodeBase.java
===================================================================
--- src/java/org/apache/hadoop/net/NodeBase.java (revision 0)
+++ src/java/org/apache/hadoop/net/NodeBase.java (revision 0)
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+/** A base class that implements interface Node
+ *
+ * @author hairong
+ *
+ */
+
+public class NodeBase implements Node {
+ public final static char PATH_SEPARATOR = '/';
+ public static String PATH_SEPARATOR_STR = "/";
+ public final static String ROOT = ""; // string representation of root
+
+ protected String name; //host:port#
+ protected String location; //string representation of this node's location
+
+ /** Default constructor */
+ public NodeBase( ) {
+ }
+
+ /** Construct a node from its path
+ * @param path
+ * a concatenation of this node's location, the path seperator, and its name
+ */
+ public NodeBase( String path ) {
+ path = normalize(path);
+ int index = path.lastIndexOf( PATH_SEPARATOR );
+ if( index== -1 ) {
+ set( ROOT, path );
+ } else {
+ set( path.substring(index+1), path.substring(0, index) );
+ }
+ }
+
+ /** Construct a node from its name and its location
+ * @param name this node's name
+ * @param location this node's location
+ */
+ public NodeBase( String name, String location ) {
+ set(name, normalize(location));
+ }
+
+ /* set this node's name and location */
+ private void set( String name, String location ) {
+ if(name != null && name.contains(PATH_SEPARATOR_STR))
+ throw new IllegalArgumentException(
+ "Network location name contains /: "+name);
+ this.name = (name==null)?"":name;
+ this.location = location;
+ }
+
+ /** Return this node's name */
+ public String getName() { return name; }
+
+ /** Return this node's network location */
+ public String getNetworkLocation() { return location; }
+
+ /** Return this node's path */
+ public String getPath() {
+ return location+PATH_SEPARATOR_STR+name;
+ }
+
+ /** Return this node's string representation */
+ public String toString() {
+ return getPath();
+ }
+
+ /** Normalize a path */
+ static public String normalize(String path) {
+ if( path == null || path.length() == 0 ) return ROOT;
+
+ if( path.charAt(0) != PATH_SEPARATOR ) {
+ throw new IllegalArgumentException(
+ "Network Location path does not start with "
+ +PATH_SEPARATOR_STR+ ": "+path);
+ }
+
+ int len = path.length();
+ if(path.charAt(len-1) == PATH_SEPARATOR) {
+ return path.substring(0, len-1);
+ }
+ return path;
+ }
+}
Index: src/java/org/apache/hadoop/net/Node.java
===================================================================
--- src/java/org/apache/hadoop/net/Node.java (revision 0)
+++ src/java/org/apache/hadoop/net/Node.java (revision 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+/** The interface defines a node in a network topology.
+ * A node may be a leave representing a data node or an inner
+ * node representing a datacenter or rack.
+ * Each data has a name and its location in the network is
+ * decided by a string with syntax similar to a file name.
+ * For example, a data node's name is hostname:port# and if it's located at
+ * rack "orange" in datacenter "dog", the string representation of its
+ * network location is /dog/orange
+ * @author hairong
+ *
+ */
+
+public interface Node {
+ /** Return the string representation of this node's network location */
+ public String getNetworkLocation();
+ /** Return this node's name */
+ public String getName();
+}
Index: src/java/org/apache/hadoop/net/NetworkTopology.java
===================================================================
--- src/java/org/apache/hadoop/net/NetworkTopology.java (revision 0)
+++ src/java/org/apache/hadoop/net/NetworkTopology.java (revision 0)
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.dfs.DatanodeDescriptor;
+
+/** The class represents a cluster of computer with a tree hierarchical
+ * network topology.
+ * For example, a cluster may be consists of many data centers filled
+ * with racks of computers.
+ * In a network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers
+ * or racks.
+ *
+ * @author hairong
+ *
+ */
+public class NetworkTopology {
+ public final static String DEFAULT_RACK = "/default-rack";
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.net.NetworkTopology");
+
+ /* Inner Node represent a switch/router of a data center or rack.
+ * Different from a leave node, it has non-null children.
+ */
+ private class InnerNode extends NodeBase {
+ private HashMap children =
+ new HashMap(); // maps a name to a node
+
+ /** Construct an InnerNode from a path-like string */
+ InnerNode( String path ) {
+ super( path );
+ }
+
+ /** Construct an InnerNode from its name and its network location */
+ InnerNode( String name, String location ) {
+ super( name, location );
+ }
+
+ /** Get its children */
+ HashMap getChildren() {return children;}
+
+ /** Return the number of children this node has */
+ int getNumOfChildren() {
+ return children.size();
+ }
+
+ /** Judge if this node represents a rack
+ * Return true if it has no child or its children are not InnerNodes
+ */
+ boolean isRack() {
+ if(children.isEmpty()) {
+ return true;
+ }
+
+ Node firstChild = children.values().iterator().next();
+ if(firstChild instanceof InnerNode) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /** Judge if this node is an ancestor of node n
+ *
+ * @param n: a node
+ * @return true if this node is an ancestor of n
+ */
+ boolean isAncestor(Node n) {
+ return n.getNetworkLocation().startsWith(getPath());
+ }
+
+ /** Judge if this node is the parent of node n
+ *
+ * @param n: a node
+ * @return true if this node is the parent of n
+ */
+ boolean isParent( Node n ) {
+ return n.getNetworkLocation().equals( getPath() );
+ }
+
+ /* Return a child name of this node who is an ancestor of node n */
+ private String getNextAncestorName( Node n ) {
+ if( !isAncestor(n)) {
+ throw new IllegalArgumentException(
+ this + "is not an ancestor of " + n);
+ }
+ String name = n.getNetworkLocation().substring(getPath().length());
+ if(name.charAt(0) == PATH_SEPARATOR) {
+ name = name.substring(1);
+ }
+ int index=name.indexOf(PATH_SEPARATOR);
+ if( index !=-1 )
+ name = name.substring(0, index);
+ return name;
+ }
+
+ /** Add node n to the subtree of this node
+ * @param n node to be added
+ * @return true if the node is added; false otherwise
+ */
+ boolean add( Node n ) {
+ String parent = n.getNetworkLocation();
+ String currentPath = getPath();
+ if( !isAncestor( n ) )
+ throw new IllegalArgumentException( n.getName()+", which is located at "
+ +parent+", is not a decendent of "+currentPath);
+ if( isParent( n ) ) {
+ // this node is the parent of n; add n directly
+ return (null == children.put(n.getName(), n) );
+ } else {
+ // find the next ancestor node
+ String parentName = getNextAncestorName( n );
+ InnerNode parentNode = (InnerNode)children.get(parentName);
+ if( parentNode == null ) {
+ // create a new InnerNode
+ parentNode = new InnerNode( parentName, currentPath );
+ children.put(parentName, parentNode);
+ }
+ // add n to the subtree of the next ancestor node
+ return parentNode.add(n);
+ }
+ }
+
+ /** Remove node n from the subtree of this node
+ * @parameter n node to be deleted
+ * @return true if the node is deleted; false otherwise
+ */
+ boolean remove( Node n ) {
+ String parent = n.getNetworkLocation();
+ String currentPath = getPath();
+ if(!isAncestor(n))
+ throw new IllegalArgumentException( n.getName()+", which is located at "
+ +parent+", is not a decendent of "+currentPath);
+ if( isParent(n) ) {
+ // this node is the parent of n; remove n directly
+ return (n == children.remove(n.getName()));
+ } else {
+ // find the next ancestor node: the parent node
+ String parentName = getNextAncestorName( n );
+ InnerNode parentNode = (InnerNode)children.get(parentName);
+ if(parentNode==null) {
+ throw new IllegalArgumentException( n.getName()
+ + ", which is located at "
+ + parent+", is not a decendent of " + currentPath);
+ }
+ // remove n from the parent node
+ boolean isRemoved = parentNode.remove( n );
+ // if the parent node has no children, remove the parent node too
+ if(parentNode.getNumOfChildren() == 0 ) {
+ children.remove(parentName);
+ }
+ return isRemoved;
+ }
+ } // end of remove
+
+ /** Given a node's string representation, return a reference to the node */
+ Node getLoc( String loc ) {
+ if( loc == null || loc.length() == 0 ) return this;
+ String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+ Node childnode = children.get( path[0] );
+ if(childnode == null ) return null; // non-existing node
+ if( path.length == 1 ) return childnode;
+ if( childnode instanceof InnerNode ) {
+ return ((InnerNode)childnode).getLoc(path[1]);
+ } else {
+ return null;
+ }
+ }
+
+ /** Get all the data nodes belonged to the subtree of this node */
+ void getLeaves( Collection results ) {
+ for( Iterator iter = children.values().iterator();
+ iter.hasNext(); ) {
+ Node childNode = iter.next();
+ if( childNode instanceof InnerNode ) {
+ ((InnerNode)childNode).getLeaves(results);
+ } else {
+ results.add( (DatanodeDescriptor)childNode );
+ }
+ }
+ }
+ } // end of InnerNode
+
+ InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); //the root of the tree
+ private int numOfLeaves = 0; // data nodes counter
+ private int numOfRacks = 0; // rack counter
+
+ public NetworkTopology() {
+ }
+
+ /** Add a data node
+ * Update data node counter & rack counter if neccessary
+ * @param node
+ * data node to be added
+ * @exception IllegalArgumentException if add a data node to an existing leave
+ */
+ public synchronized void add( DatanodeDescriptor node ) {
+ if( node==null ) return;
+ LOG.info("Adding a new node: "+node.getPath());
+ Node rack = getNode(node.getNetworkLocation());
+ if(rack != null && !(rack instanceof InnerNode) ) {
+ throw new IllegalArgumentException( "Unexpected data node "
+ + node.toString()
+ + " at an illegal network location");
+ }
+ if( clusterMap.add( node) ) {
+ numOfLeaves++;
+ if( rack == null ) {
+ numOfRacks++;
+ }
+ }
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ }
+
+ /** Remove a data node
+ * Update data node counter & rack counter if neccessary
+ * @param node
+ * data node to be removed
+ */
+ public synchronized void remove( DatanodeDescriptor node ) {
+ if( node==null ) return;
+ LOG.info("Removing a node: "+node.getPath());
+ if( clusterMap.remove( node ) ) {
+ numOfLeaves--;
+ InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
+ if(rack == null) {
+ numOfRacks--;
+ }
+ }
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ }
+
+ /** Check if the tree contains data node node
+ *
+ * @param node
+ * a data node
+ * @return true if node is already in the tree; false otherwise
+ */
+ public boolean contains( DatanodeDescriptor node ) {
+ if( node == null ) return false;
+ Node rNode = getNode(node.getPath());
+ return (rNode == node);
+ }
+
+ /** Given a string representation of a node, return the reference to the node
+ *
+ * @param loc
+ * a path-like string representation of a node
+ * @return a reference to the node; null if the node is not in the tree
+ */
+ public synchronized Node getNode( String loc ) {
+ loc = NodeBase.normalize(loc);
+ if(!NodeBase.ROOT.equals(loc))
+ loc = loc.substring(1);
+ return clusterMap.getLoc( loc );
+ }
+
+ /* Add all the data nodes that belong to
+ * the subtree of the node loc to results*/
+ private synchronized void getLeaves( String loc,
+ Collection results ) {
+ Node node = getNode(loc);
+ if( node instanceof InnerNode )
+ ((InnerNode)node).getLeaves(results);
+ else {
+ results.add((DatanodeDescriptor)node);
+ }
+ }
+
+ /** Return all the data nodes that belong to the subtree of loc
+ * @param loc
+ * a path-like string representation of a node
+ * @return an array of data nodes that belong to the subtree of loc
+ */
+ public synchronized DatanodeDescriptor[] getLeaves( String loc ) {
+ Collection results = new ArrayList();
+ getLeaves(loc, results);
+ return results.toArray(new DatanodeDescriptor[results.size()]);
+ }
+
+ /** Return all the data nodes that belong to the subtrees of locs
+ * @param locs
+ * a collection of strings representing nodes
+ * @return an array of data nodes that belong to subtrees of locs
+ */
+ public synchronized DatanodeDescriptor[] getLeaves(
+ Collection locs ) {
+ Collection nodes = new ArrayList();
+ if( locs != null ) {
+ Iterator iter = locs.iterator();
+ while(iter.hasNext()) {
+ getLeaves( iter.next(), nodes );
+ }
+ }
+ return nodes.toArray(new DatanodeDescriptor[nodes.size()]);
+ }
+
+ /** Return the total number of racks */
+ public int getNumOfRacks( ) {
+ return numOfRacks;
+ }
+
+ /** Return the total number of data nodes */
+ public int getNumOfLeaves() {
+ return numOfLeaves;
+ }
+
+ private void checkArgument( DatanodeDescriptor node ) {
+ if( node == null ) {
+ throw new IllegalArgumentException(
+ "Unexpected null pointer argument" );
+ }
+ if( !contains(node) ) {
+ String path = node.getPath();
+ LOG.warn("The cluster does not contain data node: " + path);
+ throw new IllegalArgumentException(
+ "Unexpected non-existing data node: " +path);
+ }
+ }
+
+ /** Return the distance between two data nodes
+ * It is assumed that the distance from one node to its parent is 1
+ * The distance between two nodes is calculated by summing up their distances
+ * to their closest common ancestor.
+ * @param node1 one data node
+ * @param node2 another data node
+ * @return the distance between node1 and node2
+ * @exception IllegalArgumentException when either node1 or node2 is null, or
+ * node1 or node2 do not belong to the cluster
+ */
+ public int getDistance(DatanodeDescriptor node1, DatanodeDescriptor node2 ) {
+ checkArgument( node1 );
+ checkArgument( node2 );
+ /*
+ if( !contains(node1) || !contains(node2) ) {
+ return Integer.MAX_VALUE;
+ }
+ */
+ if( node1 == node2 || node1.equals(node2)) {
+ return 0;
+ }
+ String[] path1 = node1.getNetworkLocation().split("/");
+ String[] path2 = node2.getNetworkLocation().split("/");
+
+ int i;
+ for(i=0; inodes are already in the priority order
*/
private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
- if ((nodes == null) ||
- (nodes.length - deadNodes.size() < 1)) {
- throw new IOException("No live nodes contain current block");
- }
- DatanodeInfo chosenNode = null;
+ if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
- if (deadNodes.contains(nodes[i])) {
- continue;
- }
- String nodename = nodes[i].getHost();
- if (localName.equals(nodename)) {
- chosenNode = nodes[i];
- break;
- }
+ if (!deadNodes.contains(nodes[i])) {
+ return nodes[i];
+ }
}
- if (chosenNode == null) {
- do {
- chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
- } while (deadNodes.contains(chosenNode));
- }
- return chosenNode;
+ }
+ throw new IOException("No live nodes contain current block");
}
/***************************************************************
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java (revision 502789)
+++ src/java/org/apache/hadoop/dfs/DataNode.java (working copy)
@@ -24,10 +24,12 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.Metrics;
import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetworkTopology;
import java.io.*;
import java.net.*;
@@ -105,6 +107,7 @@
DatanodeProtocol namenode;
FSDataset data;
DatanodeRegistration dnRegistration;
+ private String networkLoc;
boolean shouldRun = true;
Vector receivedBlockList = new Vector();
int xmitsInProgress = 0;
@@ -168,9 +171,15 @@
* 'dataDirs' is where the blocks are stored.
*/
DataNode(Configuration conf, String[] dataDirs) throws IOException {
- this(InetAddress.getLocalHost().getHostName(),
+ this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
+ }
+
+ DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
+ this(InetAddress.getLocalHost().getHostName(),
+ networkLoc,
dataDirs,
createSocketAddr(conf.get("fs.default.name", "local")), conf);
+ // register datanode
int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
@@ -197,7 +206,8 @@
*
* @see DataStorage
*/
- private DataNode(String machineName,
+ private DataNode(String machineName,
+ String networkLoc,
String[] dataDirs,
InetSocketAddress nameNodeAddr,
Configuration conf ) throws IOException {
@@ -247,6 +257,7 @@
storage.getStorageID(),
-1,
"" );
+ this.networkLoc = networkLoc;
// initialize data node internal structure
this.data = new FSDataset(volumes, conf);
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
@@ -292,7 +303,7 @@
private void register() throws IOException {
while( true ) {
try {
- dnRegistration = namenode.register( dnRegistration );
+ dnRegistration = namenode.register( dnRegistration, networkLoc );
break;
} catch( SocketTimeoutException e ) { // namenode is busy
LOG.info("Problem connecting to server: " + getNameNodeAddr());
@@ -1045,9 +1056,9 @@
/** Start datanode daemon.
*/
- public static void run(Configuration conf) throws IOException {
+ public static void run(Configuration conf, String networkLoc) throws IOException {
String[] dataDirs = conf.getStrings("dfs.data.dir");
- DataNode dn = makeInstance(dataDirs, conf);
+ DataNode dn = makeInstance(networkLoc, dataDirs, conf);
dataNodeList.add(dn);
if (dn != null) {
Thread t = new Thread(dn, "DataNode: [" +
@@ -1075,8 +1086,9 @@
/** Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
- private static void runAndWait(Configuration conf) throws IOException {
- run(conf);
+ private static void runAndWait(Configuration conf, String networkLoc)
+ throws IOException {
+ run(conf, networkLoc);
if (dataNodeThreadList.size() > 0) {
Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
try {
@@ -1101,8 +1113,13 @@
* no directory from this directory list can be created.
* @throws IOException
*/
- static DataNode makeInstance(String[] dataDirs, Configuration conf)
+ static DataNode makeInstance( String[] dataDirs, Configuration conf)
throws IOException {
+ return makeInstance(NetworkTopology.DEFAULT_RACK, dataDirs, conf );
+ }
+
+ static DataNode makeInstance(String networkLoc, String[] dataDirs, Configuration conf)
+ throws IOException {
ArrayList dirs = new ArrayList();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
@@ -1113,7 +1130,7 @@
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
}
}
- return ((dirs.size() > 0) ? new DataNode(conf, dirs.toArray(new String[dirs.size()])) : null);
+ return ((dirs.size() > 0) ? new DataNode(conf, networkLoc, dirs.toArray(new String[dirs.size()])) : null);
}
public String toString() {
@@ -1124,13 +1141,118 @@
", xmitsInProgress=" + xmitsInProgress +
"}";
}
+
+ /* Get the network location by running a script configured in conf */
+ private static String getNetworkLoc( Configuration conf )
+ throws IOException {
+ String locScript = conf.get("dfs.network.script" );
+ if( locScript == null ) return null;
+ LOG.info( "Starting to run script to get datanode network location");
+ Process p = Runtime.getRuntime().exec( locScript );
+ StringBuffer networkLoc = new StringBuffer();
+ final BufferedReader inR = new BufferedReader(
+ new InputStreamReader(p.getInputStream() ) );
+ final BufferedReader errR = new BufferedReader(
+ new InputStreamReader( p.getErrorStream() ) );
+
+ // read & log any error messages from the running script
+ Thread errThread = new Thread() {
+ public void start() {
+ try {
+ String errLine = errR.readLine();
+ while(errLine != null) {
+ LOG.warn("Network script error: "+errLine);
+ errLine = errR.readLine();
+ }
+ } catch( IOException e) {
+
+ }
+ }
+ };
+ try {
+ errThread.start();
+
+ // fetch output from the process
+ String line = inR.readLine();
+ while( line != null ) {
+ networkLoc.append( line );
+ line = inR.readLine();
+ }
+ try {
+ // wait for the process to finish
+ int returnVal = p.waitFor();
+ // check the exit code
+ if( returnVal != 0 ) {
+ throw new IOException("Process exits with nonzero status: "+locScript);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException( e.getMessage() );
+ } finally {
+ try {
+ // make sure that the error thread exits
+ errThread.join();
+ } catch (InterruptedException je) {
+ LOG.warn( StringUtils.stringifyException(je));
+ }
+ }
+ } finally {
+ // close in & error streams
+ try {
+ inR.close();
+ } catch ( IOException ine ) {
+ throw ine;
+ } finally {
+ errR.close();
+ }
+ }
+
+ return networkLoc.toString();
+ }
+
+
+ /* Get the network location from the command line */
+ private static String getNetworkLoc(String args[]) {
+ for( int i=0; i< args.length; i++ ) {
+ if ("-r".equals(args[i])||"--rack".equals(args[i]) ) {
+ if( i==args.length-1 ) {
+ printUsage();
+ } else {
+ return args[++i];
+ }
+ }
+ }
+ return null;
+ }
+
+ /* Return the datanode's network location
+ * either from the command line, from script, or a default value
+ */
+ private static String getNetworkLoc(String args[], Configuration conf)
+ throws IOException {
+ String networkLoc = getNetworkLoc( args );
+ if( networkLoc == null ) {
+ networkLoc = getNetworkLoc( conf );
+ }
+ if( networkLoc == null ) {
+ return NetworkTopology.DEFAULT_RACK;
+ } else {
+ return NodeBase.normalize( networkLoc );
+ }
+ }
+
+ private static void printUsage() {
+ System.err.println(
+ "Usage: java DataNode [-r, --rack ]");
+ }
+
+
/**
*/
public static void main(String args[]) throws IOException {
try {
Configuration conf = new Configuration();
- runAndWait(conf);
+ runAndWait(conf, getNetworkLoc(args, conf));
} catch ( Throwable e ) {
LOG.error( StringUtils.stringifyException( e ) );
System.exit(-1);
Index: src/java/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNode.java (revision 502789)
+++ src/java/org/apache/hadoop/dfs/NameNode.java (working copy)
@@ -539,10 +539,11 @@
////////////////////////////////////////////////////////////////
/**
*/
- public DatanodeRegistration register( DatanodeRegistration nodeReg
+ public DatanodeRegistration register( DatanodeRegistration nodeReg,
+ String networkLocation
) throws IOException {
verifyVersion( nodeReg.getVersion() );
- namesystem.registerDatanode( nodeReg );
+ namesystem.registerDatanode( nodeReg, networkLocation );
return nodeReg;
}
Index: src/java/org/apache/hadoop/dfs/DatanodeInfo.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeInfo.java (revision 502789)
+++ src/java/org/apache/hadoop/dfs/DatanodeInfo.java (working copy)
@@ -23,11 +23,15 @@
import java.util.Date;
import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
/**
* DatanodeInfo represents the status of a DataNode.
@@ -37,11 +41,12 @@
* @author Mike Cafarella
* @author Konstantin Shvachko
*/
-public class DatanodeInfo extends DatanodeID {
+public class DatanodeInfo extends DatanodeID implements Node {
protected long capacity;
protected long remaining;
protected long lastUpdate;
protected int xceiverCount;
+ private String location = NetworkTopology.DEFAULT_RACK;
// administrative states of a datanode
public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
@@ -59,18 +64,24 @@
this.remaining = from.getRemaining();
this.lastUpdate = from.getLastUpdate();
this.xceiverCount = from.getXceiverCount();
+ this.location = from.getNetworkLocation();
this.adminState = from.adminState;
}
DatanodeInfo( DatanodeID nodeID ) {
- super( nodeID );
- this.capacity = 0L;
- this.remaining = 0L;
- this.lastUpdate = 0L;
- this.xceiverCount = 0;
- this.adminState = null;
+ super( nodeID );
+ this.capacity = 0L;
+ this.remaining = 0L;
+ this.lastUpdate = 0L;
+ this.xceiverCount = 0;
+ this.adminState = null;
}
+ DatanodeInfo( DatanodeID nodeID, String location ) {
+ this(nodeID);
+ this.location = location;
+ }
+
/** The raw capacity. */
public long getCapacity() { return capacity; }
@@ -103,6 +114,18 @@
this.xceiverCount = xceiverCount;
}
+ /** rack name **/
+ public String getNetworkLocation() {return location;}
+
+ /** Sets the rack name */
+ void setNetworkLocation(String location) {
+ this.location = NodeBase.normalize(location);
+ }
+
+ public String getPath() {
+ return location+NodeBase.PATH_SEPARATOR_STR+name;
+ }
+
/** A formatted string for reporting the status of the DataNode. */
public String getDatanodeReport() {
StringBuffer buffer = new StringBuffer();
@@ -110,6 +133,9 @@
long r = getRemaining();
long u = c - r;
buffer.append("Name: "+name+"\n");
+ if(!NetworkTopology.DEFAULT_RACK.equals(location)) {
+ buffer.append("Rack: "+location+"\n");
+ }
if (isDecommissioned()) {
buffer.append("State : Decommissioned\n");
} else if (isDecommissionInProgress()) {
@@ -209,6 +235,7 @@
out.writeLong(remaining);
out.writeLong(lastUpdate);
out.writeInt(xceiverCount);
+ Text.writeString( out, location );
WritableUtils.writeEnum(out, getAdminState());
}
@@ -220,6 +247,7 @@
this.remaining = in.readLong();
this.lastUpdate = in.readLong();
this.xceiverCount = in.readInt();
+ this.location = Text.readString( in );
AdminStates newState = (AdminStates) WritableUtils.readEnum(in,
AdminStates.class);
setAdminState(newState);
Index: src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (revision 502789)
+++ src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (working copy)
@@ -31,8 +31,7 @@
* @author Michael Cafarella
**********************************************************************/
interface DatanodeProtocol extends VersionedProtocol {
- public static final long versionID = 4L; // BlockCommand.action:
- // replace DNA_REPORT by DNA_REGISTER
+ public static final long versionID = 5L; // register takes a new parameter
// error code
final static int DISK_ERROR = 1;
@@ -52,13 +51,14 @@
* Register Datanode.
*
* @see org.apache.hadoop.dfs.DataNode#register()
- * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration)
+ * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration, String)
*
* @return updated {@link org.apache.hadoop.dfs.DatanodeRegistration}, which contains
* new storageID if the datanode did not have one and
* registration ID for further communication.
*/
- public DatanodeRegistration register( DatanodeRegistration registration
+ public DatanodeRegistration register( DatanodeRegistration registration,
+ String networkLocation
) throws IOException;
/**
* sendHeartbeat() tells the NameNode that the DataNode is still
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java (revision 502789)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java (working copy)
@@ -23,11 +23,13 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.fs.Path;
import java.io.*;
-import java.net.InetSocketAddress;
import java.util.*;
+
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -189,6 +191,11 @@
private String localMachine;
private int port;
private SafeModeInfo safeMode; // safe mode information
+
+ // datanode networktoplogy
+ NetworkTopology clusterMap = new NetworkTopology();
+ // for block replicas placement
+ Replicator replicator = new Replicator();
/**
* dirs is a list oif directories where the filesystem directory state
@@ -497,10 +504,12 @@
machineSets[i] = new DatanodeDescriptor[0];
} else {
machineSets[i] = new DatanodeDescriptor[containingNodes.size()];
- int j = 0;
- for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) {
- machineSets[i][j] = it.next();
- }
+ ArrayList containingNodesList =
+ new ArrayList(containingNodes.size());
+ containingNodesList.addAll(containingNodes);
+
+ machineSets[i] = replicator.sortByDistance(
+ getDatanodeByHost(clientMachine), containingNodesList);
}
}
@@ -671,9 +680,9 @@
}
}
- // Get the array of replication targets
- DatanodeDescriptor targets[] = chooseTargets(replication, null,
- clientMachine, blockSize);
+ // Get the array of replication targets
+ DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+ getDatanodeByHost(clientMachine.toString()), null, blockSize);
if (targets.length < this.minReplication) {
throw new IOException("failed to create file "+src
+" on client " + clientMachine
@@ -754,9 +763,13 @@
throw new NotReplicatedYetException("Not replicated yet");
}
- // Get the array of replication targets
- DatanodeDescriptor targets[] = chooseTargets(pendingFile.getReplication(),
- null, pendingFile.getClientMachine(), pendingFile.getBlockSize());
+ // Get the array of replication targets
+ String clientHost = pendingFile.getClientMachine().toString();
+ DatanodeDescriptor targets[] = replicator.chooseTarget(
+ (int)(pendingFile.getReplication()),
+ getDatanodeByHost(clientHost),
+ null,
+ pendingFile.getBlockSize());
if (targets.length < this.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
@@ -1422,7 +1435,8 @@
* @see DataNode#register()
* @author Konstantin Shvachko
*/
- public synchronized void registerDatanode( DatanodeRegistration nodeReg
+ public synchronized void registerDatanode( DatanodeRegistration nodeReg,
+ String networkLocation
) throws IOException {
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.registerDatanode: "
@@ -1434,6 +1448,8 @@
DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() );
if( nodeN != null && nodeN != nodeS ) {
+ NameNode.LOG.info( "BLOCK* NameSystem.registerDatanode: "
+ + "node from name: " + nodeN.getName() );
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
removeDatanode( nodeN );
@@ -1457,11 +1473,15 @@
// data storage, which from now on will be served by a new node.
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
- + "node " + nodeS.name
+ + "node " + nodeS.getName()
+ " is replaced by " + nodeReg.getName() + "." );
}
getEditLog().logRemoveDatanode( nodeS );
+ // update cluster map
+ clusterMap.remove( nodeS );
nodeS.updateRegInfo( nodeReg );
+ nodeS.setNetworkLocation( networkLocation );
+ clusterMap.add( nodeS );
getEditLog().logAddDatanode( nodeS );
// also treat the registration message as a heartbeat
@@ -1484,7 +1504,8 @@
+ "new storageID " + nodeReg.getStorageID() + " assigned." );
}
// register new datanode
- DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg );
+ DatanodeDescriptor nodeDescr
+ = new DatanodeDescriptor( nodeReg, networkLocation );
unprotectedAddDatanode( nodeDescr );
getEditLog().logAddDatanode( nodeDescr );
@@ -1636,6 +1657,7 @@
removeStoredBlock(it.next(), nodeInfo);
}
unprotectedRemoveDatanode(nodeInfo);
+ clusterMap.remove(nodeInfo);
}
void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {
@@ -1649,6 +1671,7 @@
void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {
datanodeMap.put( nodeDescr.getStorageID(), nodeDescr );
+ clusterMap.add(nodeDescr);
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.unprotectedAddDatanode: "
+ "node " + nodeDescr.getName() + " is added to datanodeMap." );
@@ -1661,7 +1684,8 @@
* @param nodeID node
*/
void wipeDatanode( DatanodeID nodeID ) {
- datanodeMap.remove(nodeID.getStorageID());
+ String key = nodeID.getStorageID();
+ datanodeMap.remove(key);
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.wipeDatanode: "
+ nodeID.getName() + " storage " + nodeID.getStorageID()
@@ -2008,6 +2032,7 @@
* Total raw bytes.
*/
public long totalCapacity() {
+
synchronized (heartbeats) {
return totalCapacity;
}
@@ -2295,6 +2320,23 @@
}
/*
+ * Filter nodes that are marked for decommison in the given list.
+ * Return a list of non-decommissioned nodes
+ */
+ private List filterDecommissionedNodes(
+ Collection nodelist) {
+ List nonCommissionedNodeList =
+ new ArrayList();
+ for (Iterator it = nodelist.iterator();
+ it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+ nonCommissionedNodeList.add(node);
+ }
+ }
+ return nonCommissionedNodeList;
+ }
+ /*
* Return true if there are any blocks in neededReplication that
* reside on the specified node. Otherwise returns false.
*/
@@ -2395,14 +2437,15 @@
// not be scheduled for removal on that node
if (containingNodes != null && containingNodes.contains(srcNode)
&& (excessBlocks == null || ! excessBlocks.contains(block))) {
-
// filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes(containingNodes);
-
- DatanodeDescriptor targets[] = chooseTargets(
+ List nodes =
+ filterDecommissionedNodes(containingNodes);
+ int numCurrentReplica = nodes.size();
+ DatanodeDescriptor targets[] = replicator.chooseTarget(
Math.min( fileINode.getReplication() - numCurrentReplica,
- this.maxReplicationStreams - xmitsInProgress),
- containingNodes, null, blockSize);
+ this.maxReplicationStreams - xmitsInProgress),
+ datanodeMap.get(srcNode.getStorageID()),
+ nodes, null, blockSize);
if (targets.length > 0) {
// Build items to return
replicateBlocks.add(block);
@@ -2471,111 +2514,471 @@
return results;
}
}
-
- /**
- * Get a certain number of targets, if possible.
- * If not, return as many as we can.
- * Only live nodes contained in {@link #heartbeats} are
- * targeted for replication.
- *
- * @param desiredReplicates
- * number of duplicates wanted.
- * @param forbiddenNodes
- * of DatanodeDescriptor instances that should not be considered targets.
- * @return array of DatanodeDescriptor instances uses as targets.
+
+ /** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine,
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on the same rack as the first replca.
+ * @author hairong
+ *
*/
- DatanodeDescriptor[] chooseTargets(
- int desiredReplicates,
- Collection forbiddenNodes,
- UTF8 clientMachine,
- long blockSize) {
- Collection targets = new ArrayList();
+ class Replicator {
+ private class NotEnoughReplicasException extends Exception {
+ NotEnoughReplicasException( String msg ) {
+ super( msg );
+ }
+ }
+
+ /**
+ * choose numOfReplicas data nodes for writer to replicate
+ * a block with size blocksize
+ * If not, return as many as we can.
+ *
+ * @param numOfReplicas: number of replicas wanted.
+ * @param writer: the writer's machine, null if not in the cluster.
+ * @param excludedNodes: datanodesthat should not be considered targets.
+ * @param blocksize: size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as targets
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ List excludedNodes,
+ long blocksize ) {
+ if( excludedNodes == null) {
+ excludedNodes = new ArrayList();
+ }
- if (desiredReplicates > heartbeats.size()) {
- LOG.warn("Replication requested of "+desiredReplicates
- +" is larger than cluster size ("+heartbeats.size()
- +"). Using cluster size.");
- desiredReplicates = heartbeats.size();
- if (desiredReplicates == 0) {
- LOG.warn("While choosing target, totalMachines is " + desiredReplicates);
- }
+ return chooseTarget(numOfReplicas, writer,
+ new ArrayList(), excludedNodes, blocksize);
+ }
+
+ /*
+ * re-replicate numOfReplicas
+ /**
+ * choose numOfReplicas data nodes for writer
+ * to re-replicate a block with size blocksize
+ * If not, return as many as we can.
+ *
+ * @param numOfReplicas: additional number of replicas wanted.
+ * @param writer: the writer's machine, null if not in the cluster.
+ * @param choosenNodes: datanodes that have been choosen as targets.
+ * @param excludedNodes: datanodesthat should not be considered targets.
+ * @param blocksize: size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ List choosenNodes,
+ List excludedNodes,
+ long blocksize ) {
+ if( numOfReplicas == 0 )
+ return new DatanodeDescriptor[0];
+
+ if( excludedNodes == null) {
+ excludedNodes = new ArrayList();
}
- double avgLoad = 0.0;
- if (heartbeats.size() != 0) {
- avgLoad = (double) totalLoad() / heartbeats.size();
+ int clusterSize = clusterMap.getNumOfLeaves();
+ int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
+ if( totalNumOfReplicas > clusterSize) {
+ numOfReplicas -= (totalNumOfReplicas-clusterSize);
+ totalNumOfReplicas = clusterSize;
}
- // choose local replica first
- if (desiredReplicates != 0) {
- // make sure that the client machine is not forbidden
- if (clientMachine != null && clientMachine.getLength() > 0) {
- for (Iterator it = heartbeats.iterator();
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
- clientMachine.toString().equals(node.getHost()) &&
- !node.isDecommissionInProgress() && !node.isDecommissioned()) {
- if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
- (node.getXceiverCount() <= (2.0 * avgLoad))) {
- targets.add(node);
- desiredReplicates--;
- break;
- }
- }
+
+ int maxNodesPerRack =
+ (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+
+ List results =
+ new ArrayList(choosenNodes);
+ excludedNodes.addAll(choosenNodes);
+
+ if(!clusterMap.contains(writer))
+ writer=null;
+
+ DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+ clusterMap.getLeaves(NodeBase.ROOT),
+ excludedNodes, blocksize, maxNodesPerRack, results );
+
+ results.removeAll(choosenNodes);
+
+ // sorting nodes to form a pipeline
+ return getPipeline((writer==null)?localNode:writer, results);
+ }
+
+ /* choose numOfReplicas from clusterNodes */
+ private DatanodeDescriptor chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ DatanodeDescriptor[] clusterNodes,
+ List excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results) {
+
+ if( numOfReplicas == 0 ) return writer;
+
+ int numOfResults = results.size();
+ if(writer == null && (numOfResults==1 || numOfResults==2) ) {
+ writer = results.get(0);
+ }
+
+ try {
+ switch( numOfResults ) {
+ case 0:
+ writer = chooseLocalNode(writer, clusterNodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if(--numOfReplicas == 0) break;
+ case 1:
+ chooseRemoteRack(1, writer, clusterNodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if(--numOfReplicas == 0) break;
+ case 2:
+ if(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+ chooseRemoteRack(1, writer, clusterNodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ } else {
+ chooseLocalRack(writer, clusterNodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
}
+ if(--numOfReplicas == 0) break;
+ default:
+ chooseRandom(numOfReplicas, clusterNodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
}
+ } catch (NotEnoughReplicasException e) {
+ LOG.warn("Not be able to place enough replicas, still in need of "
+ + numOfReplicas );
}
-
- for (int i = 0; i < desiredReplicates; i++) {
- DatanodeDescriptor target = null;
- //
- // Otherwise, choose node according to target capacity
- //
- int nNodes = heartbeats.size();
- int idx = r.nextInt(nNodes);
- int rejected = 0;
- while (target == null && rejected < nNodes ) {
- DatanodeDescriptor node = heartbeats.get(idx);
- if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
- !targets.contains(node) &&
- !node.isDecommissionInProgress() && !node.isDecommissioned() &&
- (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
- (node.getXceiverCount() <= (2.0 * avgLoad))) {
- target = node;
+ return writer;
+ }
+
+ /* choose localMachine as the target.
+ * if localMachine is not availabe,
+ * choose a node on the same rack
+ * @return the choosen node
+ */
+ private DatanodeDescriptor chooseLocalNode(
+ DatanodeDescriptor localMachine,
+ DatanodeDescriptor[] nodes,
+ List excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ // if no local machine, randomly choose one node
+ if(localMachine == null)
+ return chooseRandom(nodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+
+ // otherwise try local machine first
+ if(!excludedNodes.contains(localMachine)) {
+ excludedNodes.add(localMachine);
+ if( isGoodTarget(localMachine, blocksize, maxNodesPerRack, results)) {
+ results.add(localMachine);
+ return localMachine;
+ }
+ }
+
+ // try a node on local rack
+ return chooseLocalRack(localMachine, nodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ /* choose one node from the rack that localMachine is on.
+ * if no such node is availabe, choose one node from the rack where
+ * a second replica is on.
+ * if still no such node is available, choose a random node
+ * in the cluster nodes.
+ * @return the choosen node
+ */
+ private DatanodeDescriptor chooseLocalRack(
+ DatanodeDescriptor localMachine,
+ DatanodeDescriptor[] nodes,
+ List excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if( localMachine == null ) {
+ return chooseRandom(nodes, excludedNodes,
+ blocksize, maxNodesPerRack, results );
+ }
+
+ // choose one from the local rack
+ try {
+ return chooseRandom(
+ clusterMap.getLeaves( localMachine.getNetworkLocation() ),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if(nextNode != localMachine) {
+ newLocal = nextNode;
break;
- } else {
- idx = (idx+1) % nNodes;
- rejected++;
}
}
- if (target == null) {
- idx = r.nextInt(nNodes);
- rejected = 0;
- while (target == null && rejected < nNodes ) {
- DatanodeDescriptor node = heartbeats.get(idx);
- if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
- !targets.contains(node) &&
- !node.isDecommissionInProgress() && !node.isDecommissioned() &&
- node.getRemaining() >= blockSize) {
- target = node;
- break;
- } else {
- idx = (idx + 1) % nNodes;
- rejected++;
- }
+ if( newLocal != null ) {
+ try {
+ return chooseRandom(
+ clusterMap.getLeaves( newLocal.getNetworkLocation() ),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch( NotEnoughReplicasException e2 ) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(nodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
}
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(nodes, excludedNodes,
+ blocksize, maxNodesPerRack, results);
}
-
- if (target == null) {
- LOG.warn("Could not find any nodes with sufficient capacity");
- break; // making one more pass over heartbeats would not help
+ }
+ }
+
+ /* choose numOfReplicas nodes from the racks
+ * that localMachine is NOT on.
+ * if not enough nodes are availabe, choose the remaining ones
+ * from the local rack
+ */
+
+ private void chooseRemoteRack( int numOfReplicas,
+ DatanodeDescriptor localMachine,
+ DatanodeDescriptor[] nodes,
+ List excludedNodes,
+ long blocksize,
+ int maxReplicasPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ // get all the nodes on the local rack
+ DatanodeDescriptor[] nodesOnRack = clusterMap.getLeaves(
+ localMachine.getNetworkLocation() );
+
+ // can we speed up this??? using hashing sets?
+ DatanodeDescriptor[] nodesOnRemoteRack
+ = new DatanodeDescriptor[nodes.length-nodesOnRack.length];
+ HashSet set1 = new HashSet(nodes.length);
+ HashSet set2 = new HashSet(nodesOnRack.length);
+ for(int i=0; inodes.
+ * @return the choosen node
+ */
+ private DatanodeDescriptor chooseRandom(
+ DatanodeDescriptor[] nodes,
+ List excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ DatanodeDescriptor result;
+ do {
+ DatanodeDescriptor[] selectedNodes =
+ chooseRandom(1, nodes, excludedNodes);
+ if(selectedNodes.length == 0 ) {
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas" );
}
- targets.add(target);
+ result = (DatanodeDescriptor)(selectedNodes[0]);
+ } while( !isGoodTarget( result, blocksize, maxNodesPerRack, results));
+ results.add(result);
+ return result;
+ }
+
+ /* Randomly choose numOfReplicas targets from nodes.
+ */
+ private void chooseRandom(int numOfReplicas,
+ DatanodeDescriptor[] nodes,
+ List excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ boolean toContinue = true;
+ do {
+ DatanodeDescriptor[] selectedNodes =
+ chooseRandom(numOfReplicas, nodes, excludedNodes);
+ if(selectedNodes.length < numOfReplicas) {
+ toContinue = false;
+ }
+ for(int i=0; i0 && toContinue );
+
+ if(numOfReplicas>0) {
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas");
}
+ }
+
+ /* Randomly choose one node from nodes.
+ * @return the choosen node
+ */
+ private DatanodeDescriptor[] chooseRandom(int numOfReplicas,
+ DatanodeDescriptor[] nodes,
+ List excludedNodes) {
+ List results =
+ new ArrayList();
+ int numOfAvailableNodes = 0;
+ for(int i=0; i 0 ) {
+ DatanodeDescriptor choosenNode = nodes[r.nextInt(nodes.length)];
+ if(!excludedNodes.contains(choosenNode) &&
+ !choosenNode.isDecommissionInProgress() &&
+ !choosenNode.isDecommissioned()) {
+ results.add( choosenNode );
+ excludedNodes.add(choosenNode);
+ numOfReplicas--;
+ }
+ }
+ return (DatanodeDescriptor[])results.toArray(
+ new DatanodeDescriptor[results.size()]);
+ }
+
+ /* judge if a node is a good target.
+ * return true if node has enough space,
+ * does not have too much load, and the rack does not have too many nodes
+ */
+ private boolean isGoodTarget( DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ List results) {
- return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]);
- }
+ // check if the node is (being) decommissed
+ if(node.isDecommissionInProgress() || node.isDecommissioned()) {
+ return false;
+ }
+ // check the remaining capacity of the target machine
+ if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) {
+ return false;
+ }
+
+ // check the communication traffic of the target machine
+ double avgLoad = 0;
+ int size = clusterMap.getNumOfLeaves();
+ if( size != 0 ) {
+ avgLoad = (double)totalLoad()/size;
+ }
+ if(node.getXceiverCount() > (2.0 * avgLoad)) {
+ return false;
+ }
+
+ // check if the target rack has chosen too many nodes
+ String rackname = node.getNetworkLocation();
+ int counter=1;
+ for( Iterator iter = results.iterator();
+ iter.hasNext(); ) {
+ DatanodeDescriptor result = iter.next();
+ if(rackname.equals(result.getNetworkLocation())) {
+ counter++;
+ }
+ }
+ if(counter>maxTargetPerLoc) {
+ return false;
+ }
+ return true;
+ }
+
+ /* Return a pipeline of nodes.
+ * The pipeline is formed finding a shortest path that
+ * starts from the writer and tranverses all nodes
+ * This is basically a traveling salesman problem.
+ */
+ private DatanodeDescriptor[] getPipeline(
+ DatanodeDescriptor writer,
+ List nodes ) {
+ int numOfNodes = nodes.size();
+ DatanodeDescriptor[] results = new DatanodeDescriptor[numOfNodes];
+ if( numOfNodes==0 ) return results;
+
+ synchronized( clusterMap ) {
+ int index=0;
+ if(writer == null || !clusterMap.contains(writer)) {
+ writer = nodes.get(0);
+ }
+ for( ;indexcurrentDistance ) {
+ shortestDistance = currentDistance;
+ shortestNode = currentNode;
+ shortestIndex = i;
+ }
+ }
+ //switch position index & shortestIndex
+ if( index != shortestIndex ) {
+ nodes.set(shortestIndex, nodes.get(index));
+ nodes.set(index, shortestNode);
+ }
+ writer = shortestNode;
+ }
+ }
+ return nodes.toArray( results );
+ }
+
+ /** Return datanodes that sorted by their distances to reader
+ */
+ DatanodeDescriptor[] sortByDistance(
+ final DatanodeDescriptor reader,
+ List nodes ) {
+ synchronized(clusterMap) {
+ if(reader != null && clusterMap.contains(reader)) {
+ java.util.Collections.sort(nodes, new Comparator() {
+ public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
+ return clusterMap.getDistance(reader, n1)
+ -clusterMap.getDistance(reader, n2);
+ }
+ });
+ }
+ }
+ return (DatanodeDescriptor[])nodes.toArray(
+ new DatanodeDescriptor[nodes.size()]);
+ }
+
+ } //end of Replicator
+
+
/**
* Information about the file while it is being written to.
* Note that at that time the file is not visible to the outside.
@@ -2664,6 +3067,18 @@
}
return null;
}
+
+ /* Find data node by its host name. */
+ private DatanodeDescriptor getDatanodeByHost( String name ) {
+ for (Iterator it = datanodeMap.values().iterator();
+ it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+ if( node.getHost().equals(name) )
+ return node;
+ }
+ return null;
+ }
+
/** Stop at and return the datanode at index (used for content browsing)*/
private DatanodeInfo getDatanodeByIndex( int index ) {
int i = 0;
@@ -2802,6 +3217,9 @@
"STATE* SafeModeInfo.leave: " + "Safe mode is OFF." );
reached = -1;
safeMode = null;
+ NameNode.stateChangeLog.info("STATE* Network topology has "
+ +clusterMap.getNumOfRacks()+" racks and "
+ +clusterMap.getNumOfLeaves()+ " datanodes");
}
/**
Index: src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (revision 502789)
+++ src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (working copy)
@@ -19,6 +19,9 @@
import java.util.*;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+
/**************************************************
* DatanodeDescriptor tracks stats on a given DataNode,
* such as available storage capacity, last update time, etc.,
@@ -32,25 +35,42 @@
* @author Mike Cafarella
* @author Konstantin Shvachko
**************************************************/
-class DatanodeDescriptor extends DatanodeInfo {
+public class DatanodeDescriptor extends DatanodeInfo {
private volatile Collection blocks = new TreeSet();
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
protected boolean isAlive = false;
- DatanodeDescriptor() {
+ /** Default constructor */
+ public DatanodeDescriptor() {
super();
}
- DatanodeDescriptor( DatanodeID nodeID ) {
+ /** DatanodeDescriptor constructor
+ * @param nodeID id of the data node
+ */
+ public DatanodeDescriptor( DatanodeID nodeID ) {
this( nodeID, 0L, 0L, 0 );
}
+
+ /** DatanodeDescriptor constructor
+ *
+ * @param nodeID id of the data node
+ * @param networkLocation location of the data node in network
+ */
+ public DatanodeDescriptor( DatanodeID nodeID, String networkLocation ) {
+ this( nodeID, networkLocation, 0L, 0L, 0 );
+ }
- /**
- * Create DatanodeDescriptor.
+ /** DatanodeDescriptor constructor
+ *
+ * @param nodeID id of the data node
+ * @param capacity capacity of the data node
+ * @param remaining remaing capacity of the data node
+ * @param xceiverCount # of data transfers at the data node
*/
- DatanodeDescriptor( DatanodeID nodeID,
+ public DatanodeDescriptor( DatanodeID nodeID,
long capacity,
long remaining,
int xceiverCount ) {
@@ -58,6 +78,23 @@
updateHeartbeat(capacity, remaining, xceiverCount);
}
+ /** DatanodeDescriptor constructor
+ *
+ * @param nodeID id of the data node
+ * @param networkLocation location of the data node in network
+ * @param capacity capacity of the data node
+ * @param remaining remaing capacity of the data node
+ * @param xceiverCount # of data transfers at the data node
+ */
+ public DatanodeDescriptor( DatanodeID nodeID,
+ String networkLocation,
+ long capacity,
+ long remaining,
+ int xceiverCount ) {
+ super( nodeID, networkLocation );
+ updateHeartbeat( capacity, remaining, xceiverCount);
+ }
+
/**
*/
void updateBlocks(Block newBlocks[]) {