Index: conf/hadoop-default.xml =================================================================== --- conf/hadoop-default.xml (revision 502789) +++ conf/hadoop-default.xml (working copy) @@ -346,6 +346,14 @@ + + dfs.network.script + + + Specifies a script name that print the network location path + of the current machine. + + Index: src/test/org/apache/hadoop/net/TestNetworkTopology.java =================================================================== --- src/test/org/apache/hadoop/net/TestNetworkTopology.java (revision 0) +++ src/test/org/apache/hadoop/net/TestNetworkTopology.java (revision 0) @@ -0,0 +1,78 @@ +package org.apache.hadoop.net; + +import java.util.HashSet; +import org.apache.hadoop.dfs.DatanodeDescriptor; +import org.apache.hadoop.dfs.DatanodeID; +import junit.framework.TestCase; + +public class TestNetworkTopology extends TestCase { + private NetworkTopology cluster = new NetworkTopology(); + private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] { + new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"), + new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"), + new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"), + new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"), + new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d1/r2"), + new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3"), + new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r3") + }; + private final static DatanodeDescriptor NODE = + new DatanodeDescriptor(new DatanodeID("h8:5020", "0", -1), "/d2/r4"); + + public TestNetworkTopology() { + for(int i=0; i set1 = + new HashSet(leaves.length); + HashSet set2 = + new HashSet(dataNodes.length); + for(int i=0; inumOfReplicas is 2, the 1st is on + * dataNodes[0] and the 2nd is on a different rack. + * @throws Exception + */ + public void testChooseTarget1() throws Exception { + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget( + 0, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertEquals(targets[0], dataNodes[0]); + + targets = replicator.chooseTarget( + 2, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertEquals(targets[0], dataNodes[0]); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + targets = replicator.chooseTarget( + 3, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 3); + assertEquals(targets[0], dataNodes[0]); + assertTrue(cluster.isOnSameRack(targets[0], targets[1])); + assertFalse(cluster.isOnSameRack(targets[0], targets[2])); + + targets = replicator.chooseTarget( + 4, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 4); + assertEquals(targets[0], dataNodes[0]); + assertTrue(cluster.isOnSameRack(targets[0], targets[1])); + assertFalse(cluster.isOnSameRack(targets[0], targets[2])); + assertFalse(cluster.isOnSameRack(targets[0], targets[3])); + } + + /** + * In this testcase, client is dataNodes[0], but the dataNodes[1] is + * not allowed to be choosen. So the 1st replica should be + * placed on dataNodes[0], the 2nd replica should be placed on a different + * rack, the 3rd should the same rack as the 3nd replic, and the rest + * should be placed on a third rack. + * @throws Exception + */ + public void testChooseTarget2() throws Exception { + List excludedNodes; + DatanodeDescriptor[] targets; + + excludedNodes = new ArrayList(); + excludedNodes.add(dataNodes[1]); + targets = replicator.chooseTarget( + 0, dataNodes[0], excludedNodes, BLOCK_SIZE); + assertEquals(targets.length, 0); + + excludedNodes = new ArrayList(); + excludedNodes.add(dataNodes[1]); + targets = replicator.chooseTarget( + 1, dataNodes[0], excludedNodes, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertEquals(targets[0], dataNodes[0]); + + excludedNodes = new ArrayList(); + excludedNodes.add(dataNodes[1]); + targets = replicator.chooseTarget( + 2, dataNodes[0], excludedNodes, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertEquals(targets[0], dataNodes[0]); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + excludedNodes = new ArrayList(); + excludedNodes.add(dataNodes[1]); + targets = replicator.chooseTarget( + 3, dataNodes[0], excludedNodes, BLOCK_SIZE); + assertEquals(targets.length, 3); + assertEquals(targets[0], dataNodes[0]); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + assertTrue(cluster.isOnSameRack(targets[1], targets[2])); + + excludedNodes = new ArrayList(); + excludedNodes.add(dataNodes[1]); + targets = replicator.chooseTarget( + 4, dataNodes[0], excludedNodes, BLOCK_SIZE); + assertEquals(targets.length, 4); + assertEquals(targets[0], dataNodes[0]); + for(int i=1; i<4; i++) { + assertFalse(cluster.isOnSameRack(targets[0], targets[i])); + } + assertTrue(cluster.isOnSameRack(targets[1], targets[2]) || + cluster.isOnSameRack(targets[2], targets[3])); + assertFalse(cluster.isOnSameRack(targets[1], targets[3])); + } + + /** + * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified + * to be choosen. So the 1st replica should be placed on dataNodes[1], + * the 2nd replica should be placed on a different rack, + * the 3rd replica should be placed on the same rack as the 2nd replica, + * and the rest should be placed on the third rack. + * @throws Exception + */ + public void testChooseTarget3() throws Exception { + // make data node 0 to be not qualified to choose + dataNodes[0].updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded + + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget( + 0, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertEquals(targets[0], dataNodes[1]); + + targets = replicator.chooseTarget( + 2, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertEquals(targets[0], dataNodes[1]); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + targets = replicator.chooseTarget( + 3, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 3); + assertEquals(targets[0], dataNodes[1]); + assertTrue(cluster.isOnSameRack(targets[1], targets[2])); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + targets = replicator.chooseTarget( + 4, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 4); + assertEquals(targets[0], dataNodes[1]); + for(int i=1; i<4; i++) { + assertFalse(cluster.isOnSameRack(targets[0], targets[i])); + } + assertTrue(cluster.isOnSameRack(targets[1], targets[2]) || + cluster.isOnSameRack(targets[2], targets[3])); + assertFalse(cluster.isOnSameRack(targets[1], targets[3])); + + dataNodes[0].updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); + } + + /** + * In this testcase, client is dataNodes[0], but none of the nodes on rack 1 + * is qualified to be choosen. So the 1st replica should be placed on either + * rack 2 or rack 3. + * the 2nd replica should be placed on a different rack, + * the 3rd replica should be placed on the same rack as the 1st replica, + * @throws Exception + */ + public void testChoooseTarget4() throws Exception { + // make data node 0 & 1 to be not qualified to choose: not enough disk space + for(int i=0; i<2; i++) { + dataNodes[i].updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); + } + + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget( + 0, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); + + targets = replicator.chooseTarget( + 2, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + targets = replicator.chooseTarget( + 3, dataNodes[0], null, BLOCK_SIZE); + assertEquals(targets.length, 3); + for(int i=0; i<3; i++) { + assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0])); + } + assertTrue(cluster.isOnSameRack(targets[0], targets[1]) || + cluster.isOnSameRack(targets[1], targets[2])); + assertFalse(cluster.isOnSameRack(targets[0], targets[2])); + + for(int i=0; i<2; i++) { + dataNodes[i].updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); + } + } + /** + * In this testcase, client is is a node outside of file system. + * So the 1st replica can be placed on any node. + * the 2nd replica should be placed on a different rack, + * the 3rd replica should be placed on the same rack as the 1st replica, + * @throws Exception + */ + public void testChooseTarget5() throws Exception { + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget( + 0, NODE, null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, NODE, null, BLOCK_SIZE); + assertEquals(targets.length, 1); + + targets = replicator.chooseTarget( + 2, NODE, null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + targets = replicator.chooseTarget( + 3, NODE, null, BLOCK_SIZE); + assertEquals(targets.length, 3); + assertTrue(cluster.isOnSameRack(targets[0], targets[1])); + assertFalse(cluster.isOnSameRack(targets[0], targets[2])); + } + + /** + * This testcase tests re-replication, when dataNodes[0] is already choosen. + * So the 1st replica can be placed on rack 1. + * the 2nd replica should be placed on a different rack, + * the 3rd replica can be placed randomly, + * @throws Exception + */ + public void testRereplicate1() throws Exception { + List choosenNodes = new ArrayList(); + choosenNodes.add(dataNodes[0]); + DatanodeDescriptor[] targets; + + targets = replicator.chooseTarget( + 0, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); + + targets = replicator.chooseTarget( + 2, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1])); + + targets = replicator.chooseTarget( + 3, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 3); + assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1])); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[2])); + } + + /** + * This testcase tests re-replication, + * when dataNodes[0] and dataNodes[1] are already choosen. + * So the 1st replica should be placed on a different rack than rack 1. + * the rest replicas can be placed randomly, + * @throws Exception + */ + public void testRereplicate2() throws Exception { + List choosenNodes = new ArrayList(); + choosenNodes.add(dataNodes[0]); + choosenNodes.add(dataNodes[1]); + + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget( + 0, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); + + targets = replicator.chooseTarget( + 2, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1])); + } + + /** + * This testcase tests re-replication, + * when dataNodes[0] and dataNodes[2] are already choosen. + * So the 1st replica should be placed on rack 1. + * the rest replicas can be placed randomly, + * @throws Exception + */ + public void testRereplicate3() throws Exception { + List choosenNodes = new ArrayList(); + choosenNodes.add(dataNodes[0]); + choosenNodes.add(dataNodes[2]); + + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget( + 0, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget( + 1, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 1); + assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); + + targets = replicator.chooseTarget( + 2, dataNodes[0], choosenNodes, null, BLOCK_SIZE); + assertEquals(targets.length, 2); + assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); + assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1])); + } +} Index: src/test/org/apache/hadoop/dfs/MiniDFSCluster.java =================================================================== --- src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (revision 502789) +++ src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (working copy) @@ -20,6 +20,7 @@ import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; +import org.apache.hadoop.net.NetworkTopology; /** * This class creates a single-process DFS cluster for junit testing. @@ -95,9 +96,14 @@ this.conf.set("dfs.data.dir", new File(dataDir, "data"+(2*index+1)).getPath()+","+ new File(dataDir, "data"+(2*index+2)).getPath()); + } + public DataNodeRunner(Configuration conf, File dataDir, + String networkLoc, int index) { + this(conf, dataDir, index); + this.conf.set("dfs.datanode.rack", networkLoc); } - + /** * Create and run the data node. */ @@ -115,7 +121,8 @@ } } } - node = new DataNode(conf, dirs); + node = new DataNode(conf, conf.get("dfs.datanode.rack", + NetworkTopology.DEFAULT_RACK), dirs); node.run(); } catch (Throwable e) { node = null; @@ -144,7 +151,7 @@ public MiniDFSCluster(int namenodePort, Configuration conf, boolean dataNodeFirst) throws IOException { - this(namenodePort, conf, 1, dataNodeFirst, true); + this(namenodePort, conf, 1, dataNodeFirst, true, null); } /** @@ -159,7 +166,7 @@ Configuration conf, int nDatanodes, boolean dataNodeFirst) throws IOException { - this(namenodePort, conf, nDatanodes, dataNodeFirst, true); + this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null); } /** @@ -175,7 +182,26 @@ Configuration conf, int nDatanodes, boolean dataNodeFirst, - boolean formatNamenode) throws IOException { + boolean formatNamenode ) throws IOException { + this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null); + } + + /** + * Create the config and start up the servers. If either the rpc or info port is already + * in use, we will try new ports. + * @param namenodePort suggestion for which rpc port to use. caller should use + * getNameNodePort() to get the actual port used. + * @param nDatanodes Number of datanodes + * @param dataNodeFirst should the datanode be brought up before the namenode? + * @param formatNamenode should the namenode be formatted before starting up ? + * @param racks array of strings indicating racks that each datanode is on + */ + public MiniDFSCluster(int namenodePort, + Configuration conf, + int nDatanodes, + boolean dataNodeFirst, + boolean formatNamenode, + String[] racks) throws IOException { this.conf = conf; @@ -208,7 +234,11 @@ dataNodes = new DataNodeRunner[nDatanodes]; dataNodeThreads = new Thread[nDatanodes]; for (int idx = 0; idx < nDatanodes; idx++) { - dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx); + if( racks == null || idx >= racks.length) { + dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx); + } else { + dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx); + } dataNodeThreads[idx] = new Thread(dataNodes[idx]); } if (dataNodeFirst) { Index: src/java/org/apache/hadoop/net/NodeBase.java =================================================================== --- src/java/org/apache/hadoop/net/NodeBase.java (revision 0) +++ src/java/org/apache/hadoop/net/NodeBase.java (revision 0) @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +/** A base class that implements interface Node + * + * @author hairong + * + */ + +public class NodeBase implements Node { + public final static char PATH_SEPARATOR = '/'; + public static String PATH_SEPARATOR_STR = "/"; + public final static String ROOT = ""; // string representation of root + + protected String name; //host:port# + protected String location; //string representation of this node's location + + /** Default constructor */ + public NodeBase( ) { + } + + /** Construct a node from its path + * @param path + * a concatenation of this node's location, the path seperator, and its name + */ + public NodeBase( String path ) { + path = normalize(path); + int index = path.lastIndexOf( PATH_SEPARATOR ); + if( index== -1 ) { + set( ROOT, path ); + } else { + set( path.substring(index+1), path.substring(0, index) ); + } + } + + /** Construct a node from its name and its location + * @param name this node's name + * @param location this node's location + */ + public NodeBase( String name, String location ) { + set(name, normalize(location)); + } + + /* set this node's name and location */ + private void set( String name, String location ) { + if(name != null && name.contains(PATH_SEPARATOR_STR)) + throw new IllegalArgumentException( + "Network location name contains /: "+name); + this.name = (name==null)?"":name; + this.location = location; + } + + /** Return this node's name */ + public String getName() { return name; } + + /** Return this node's network location */ + public String getNetworkLocation() { return location; } + + /** Return this node's path */ + public String getPath() { + return location+PATH_SEPARATOR_STR+name; + } + + /** Return this node's string representation */ + public String toString() { + return getPath(); + } + + /** Normalize a path */ + static public String normalize(String path) { + if( path == null || path.length() == 0 ) return ROOT; + + if( path.charAt(0) != PATH_SEPARATOR ) { + throw new IllegalArgumentException( + "Network Location path does not start with " + +PATH_SEPARATOR_STR+ ": "+path); + } + + int len = path.length(); + if(path.charAt(len-1) == PATH_SEPARATOR) { + return path.substring(0, len-1); + } + return path; + } +} Index: src/java/org/apache/hadoop/net/Node.java =================================================================== --- src/java/org/apache/hadoop/net/Node.java (revision 0) +++ src/java/org/apache/hadoop/net/Node.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +/** The interface defines a node in a network topology. + * A node may be a leave representing a data node or an inner + * node representing a datacenter or rack. + * Each data has a name and its location in the network is + * decided by a string with syntax similar to a file name. + * For example, a data node's name is hostname:port# and if it's located at + * rack "orange" in datacenter "dog", the string representation of its + * network location is /dog/orange + * @author hairong + * + */ + +public interface Node { + /** Return the string representation of this node's network location */ + public String getNetworkLocation(); + /** Return this node's name */ + public String getName(); +} Index: src/java/org/apache/hadoop/net/NetworkTopology.java =================================================================== --- src/java/org/apache/hadoop/net/NetworkTopology.java (revision 0) +++ src/java/org/apache/hadoop/net/NetworkTopology.java (revision 0) @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.dfs.DatanodeDescriptor; + +/** The class represents a cluster of computer with a tree hierarchical + * network topology. + * For example, a cluster may be consists of many data centers filled + * with racks of computers. + * In a network topology, leaves represent data nodes (computers) and inner + * nodes represent switches/routers that manage traffic in/out of data centers + * or racks. + * + * @author hairong + * + */ +public class NetworkTopology { + public final static String DEFAULT_RACK = "/default-rack"; + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.net.NetworkTopology"); + + /* Inner Node represent a switch/router of a data center or rack. + * Different from a leave node, it has non-null children. + */ + private class InnerNode extends NodeBase { + private HashMap children = + new HashMap(); // maps a name to a node + + /** Construct an InnerNode from a path-like string */ + InnerNode( String path ) { + super( path ); + } + + /** Construct an InnerNode from its name and its network location */ + InnerNode( String name, String location ) { + super( name, location ); + } + + /** Get its children */ + HashMap getChildren() {return children;} + + /** Return the number of children this node has */ + int getNumOfChildren() { + return children.size(); + } + + /** Judge if this node represents a rack + * Return true if it has no child or its children are not InnerNodes + */ + boolean isRack() { + if(children.isEmpty()) { + return true; + } + + Node firstChild = children.values().iterator().next(); + if(firstChild instanceof InnerNode) { + return false; + } + + return true; + } + + /** Judge if this node is an ancestor of node n + * + * @param n: a node + * @return true if this node is an ancestor of n + */ + boolean isAncestor(Node n) { + return n.getNetworkLocation().startsWith(getPath()); + } + + /** Judge if this node is the parent of node n + * + * @param n: a node + * @return true if this node is the parent of n + */ + boolean isParent( Node n ) { + return n.getNetworkLocation().equals( getPath() ); + } + + /* Return a child name of this node who is an ancestor of node n */ + private String getNextAncestorName( Node n ) { + if( !isAncestor(n)) { + throw new IllegalArgumentException( + this + "is not an ancestor of " + n); + } + String name = n.getNetworkLocation().substring(getPath().length()); + if(name.charAt(0) == PATH_SEPARATOR) { + name = name.substring(1); + } + int index=name.indexOf(PATH_SEPARATOR); + if( index !=-1 ) + name = name.substring(0, index); + return name; + } + + /** Add node n to the subtree of this node + * @param n node to be added + * @return true if the node is added; false otherwise + */ + boolean add( Node n ) { + String parent = n.getNetworkLocation(); + String currentPath = getPath(); + if( !isAncestor( n ) ) + throw new IllegalArgumentException( n.getName()+", which is located at " + +parent+", is not a decendent of "+currentPath); + if( isParent( n ) ) { + // this node is the parent of n; add n directly + return (null == children.put(n.getName(), n) ); + } else { + // find the next ancestor node + String parentName = getNextAncestorName( n ); + InnerNode parentNode = (InnerNode)children.get(parentName); + if( parentNode == null ) { + // create a new InnerNode + parentNode = new InnerNode( parentName, currentPath ); + children.put(parentName, parentNode); + } + // add n to the subtree of the next ancestor node + return parentNode.add(n); + } + } + + /** Remove node n from the subtree of this node + * @parameter n node to be deleted + * @return true if the node is deleted; false otherwise + */ + boolean remove( Node n ) { + String parent = n.getNetworkLocation(); + String currentPath = getPath(); + if(!isAncestor(n)) + throw new IllegalArgumentException( n.getName()+", which is located at " + +parent+", is not a decendent of "+currentPath); + if( isParent(n) ) { + // this node is the parent of n; remove n directly + return (n == children.remove(n.getName())); + } else { + // find the next ancestor node: the parent node + String parentName = getNextAncestorName( n ); + InnerNode parentNode = (InnerNode)children.get(parentName); + if(parentNode==null) { + throw new IllegalArgumentException( n.getName() + + ", which is located at " + + parent+", is not a decendent of " + currentPath); + } + // remove n from the parent node + boolean isRemoved = parentNode.remove( n ); + // if the parent node has no children, remove the parent node too + if(parentNode.getNumOfChildren() == 0 ) { + children.remove(parentName); + } + return isRemoved; + } + } // end of remove + + /** Given a node's string representation, return a reference to the node */ + Node getLoc( String loc ) { + if( loc == null || loc.length() == 0 ) return this; + String[] path = loc.split(PATH_SEPARATOR_STR, 2); + Node childnode = children.get( path[0] ); + if(childnode == null ) return null; // non-existing node + if( path.length == 1 ) return childnode; + if( childnode instanceof InnerNode ) { + return ((InnerNode)childnode).getLoc(path[1]); + } else { + return null; + } + } + + /** Get all the data nodes belonged to the subtree of this node */ + void getLeaves( Collection results ) { + for( Iterator iter = children.values().iterator(); + iter.hasNext(); ) { + Node childNode = iter.next(); + if( childNode instanceof InnerNode ) { + ((InnerNode)childNode).getLeaves(results); + } else { + results.add( (DatanodeDescriptor)childNode ); + } + } + } + } // end of InnerNode + + InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); //the root of the tree + private int numOfLeaves = 0; // data nodes counter + private int numOfRacks = 0; // rack counter + + public NetworkTopology() { + } + + /** Add a data node + * Update data node counter & rack counter if neccessary + * @param node + * data node to be added + * @exception IllegalArgumentException if add a data node to an existing leave + */ + public synchronized void add( DatanodeDescriptor node ) { + if( node==null ) return; + LOG.info("Adding a new node: "+node.getPath()); + Node rack = getNode(node.getNetworkLocation()); + if(rack != null && !(rack instanceof InnerNode) ) { + throw new IllegalArgumentException( "Unexpected data node " + + node.toString() + + " at an illegal network location"); + } + if( clusterMap.add( node) ) { + numOfLeaves++; + if( rack == null ) { + numOfRacks++; + } + } + LOG.debug("NetworkTopology became:\n" + this.toString()); + } + + /** Remove a data node + * Update data node counter & rack counter if neccessary + * @param node + * data node to be removed + */ + public synchronized void remove( DatanodeDescriptor node ) { + if( node==null ) return; + LOG.info("Removing a node: "+node.getPath()); + if( clusterMap.remove( node ) ) { + numOfLeaves--; + InnerNode rack = (InnerNode)getNode(node.getNetworkLocation()); + if(rack == null) { + numOfRacks--; + } + } + LOG.debug("NetworkTopology became:\n" + this.toString()); + } + + /** Check if the tree contains data node node + * + * @param node + * a data node + * @return true if node is already in the tree; false otherwise + */ + public boolean contains( DatanodeDescriptor node ) { + if( node == null ) return false; + Node rNode = getNode(node.getPath()); + return (rNode == node); + } + + /** Given a string representation of a node, return the reference to the node + * + * @param loc + * a path-like string representation of a node + * @return a reference to the node; null if the node is not in the tree + */ + public synchronized Node getNode( String loc ) { + loc = NodeBase.normalize(loc); + if(!NodeBase.ROOT.equals(loc)) + loc = loc.substring(1); + return clusterMap.getLoc( loc ); + } + + /* Add all the data nodes that belong to + * the subtree of the node loc to results*/ + private synchronized void getLeaves( String loc, + Collection results ) { + Node node = getNode(loc); + if( node instanceof InnerNode ) + ((InnerNode)node).getLeaves(results); + else { + results.add((DatanodeDescriptor)node); + } + } + + /** Return all the data nodes that belong to the subtree of loc + * @param loc + * a path-like string representation of a node + * @return an array of data nodes that belong to the subtree of loc + */ + public synchronized DatanodeDescriptor[] getLeaves( String loc ) { + Collection results = new ArrayList(); + getLeaves(loc, results); + return results.toArray(new DatanodeDescriptor[results.size()]); + } + + /** Return all the data nodes that belong to the subtrees of locs + * @param locs + * a collection of strings representing nodes + * @return an array of data nodes that belong to subtrees of locs + */ + public synchronized DatanodeDescriptor[] getLeaves( + Collection locs ) { + Collection nodes = new ArrayList(); + if( locs != null ) { + Iterator iter = locs.iterator(); + while(iter.hasNext()) { + getLeaves( iter.next(), nodes ); + } + } + return nodes.toArray(new DatanodeDescriptor[nodes.size()]); + } + + /** Return the total number of racks */ + public int getNumOfRacks( ) { + return numOfRacks; + } + + /** Return the total number of data nodes */ + public int getNumOfLeaves() { + return numOfLeaves; + } + + private void checkArgument( DatanodeDescriptor node ) { + if( node == null ) { + throw new IllegalArgumentException( + "Unexpected null pointer argument" ); + } + if( !contains(node) ) { + String path = node.getPath(); + LOG.warn("The cluster does not contain data node: " + path); + throw new IllegalArgumentException( + "Unexpected non-existing data node: " +path); + } + } + + /** Return the distance between two data nodes + * It is assumed that the distance from one node to its parent is 1 + * The distance between two nodes is calculated by summing up their distances + * to their closest common ancestor. + * @param node1 one data node + * @param node2 another data node + * @return the distance between node1 and node2 + * @exception IllegalArgumentException when either node1 or node2 is null, or + * node1 or node2 do not belong to the cluster + */ + public int getDistance(DatanodeDescriptor node1, DatanodeDescriptor node2 ) { + checkArgument( node1 ); + checkArgument( node2 ); + /* + if( !contains(node1) || !contains(node2) ) { + return Integer.MAX_VALUE; + } + */ + if( node1 == node2 || node1.equals(node2)) { + return 0; + } + String[] path1 = node1.getNetworkLocation().split("/"); + String[] path2 = node2.getNetworkLocation().split("/"); + + int i; + for(i=0; inodes are already in the priority order */ private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException { - if ((nodes == null) || - (nodes.length - deadNodes.size() < 1)) { - throw new IOException("No live nodes contain current block"); - } - DatanodeInfo chosenNode = null; + if (nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (deadNodes.contains(nodes[i])) { - continue; - } - String nodename = nodes[i].getHost(); - if (localName.equals(nodename)) { - chosenNode = nodes[i]; - break; - } + if (!deadNodes.contains(nodes[i])) { + return nodes[i]; + } } - if (chosenNode == null) { - do { - chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length]; - } while (deadNodes.contains(chosenNode)); - } - return chosenNode; + } + throw new IOException("No live nodes contain current block"); } /*************************************************************** Index: src/java/org/apache/hadoop/dfs/DataNode.java =================================================================== --- src/java/org/apache/hadoop/dfs/DataNode.java (revision 502789) +++ src/java/org/apache/hadoop/dfs/DataNode.java (working copy) @@ -24,10 +24,12 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.metrics.Metrics; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.*; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.mapred.StatusHttpServer; +import org.apache.hadoop.net.NetworkTopology; import java.io.*; import java.net.*; @@ -105,6 +107,7 @@ DatanodeProtocol namenode; FSDataset data; DatanodeRegistration dnRegistration; + private String networkLoc; boolean shouldRun = true; Vector receivedBlockList = new Vector(); int xmitsInProgress = 0; @@ -168,9 +171,15 @@ * 'dataDirs' is where the blocks are stored. */ DataNode(Configuration conf, String[] dataDirs) throws IOException { - this(InetAddress.getLocalHost().getHostName(), + this(conf, NetworkTopology.DEFAULT_RACK, dataDirs ); + } + + DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException { + this(InetAddress.getLocalHost().getHostName(), + networkLoc, dataDirs, createSocketAddr(conf.get("fs.default.name", "local")), conf); + // register datanode int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075); String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0"); this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true); @@ -197,7 +206,8 @@ * * @see DataStorage */ - private DataNode(String machineName, + private DataNode(String machineName, + String networkLoc, String[] dataDirs, InetSocketAddress nameNodeAddr, Configuration conf ) throws IOException { @@ -247,6 +257,7 @@ storage.getStorageID(), -1, "" ); + this.networkLoc = networkLoc; // initialize data node internal structure this.data = new FSDataset(volumes, conf); this.dataXceiveServer = new Daemon(new DataXceiveServer(ss)); @@ -292,7 +303,7 @@ private void register() throws IOException { while( true ) { try { - dnRegistration = namenode.register( dnRegistration ); + dnRegistration = namenode.register( dnRegistration, networkLoc ); break; } catch( SocketTimeoutException e ) { // namenode is busy LOG.info("Problem connecting to server: " + getNameNodeAddr()); @@ -1045,9 +1056,9 @@ /** Start datanode daemon. */ - public static void run(Configuration conf) throws IOException { + public static void run(Configuration conf, String networkLoc) throws IOException { String[] dataDirs = conf.getStrings("dfs.data.dir"); - DataNode dn = makeInstance(dataDirs, conf); + DataNode dn = makeInstance(networkLoc, dataDirs, conf); dataNodeList.add(dn); if (dn != null) { Thread t = new Thread(dn, "DataNode: [" + @@ -1075,8 +1086,9 @@ /** Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */ - private static void runAndWait(Configuration conf) throws IOException { - run(conf); + private static void runAndWait(Configuration conf, String networkLoc) + throws IOException { + run(conf, networkLoc); if (dataNodeThreadList.size() > 0) { Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1); try { @@ -1101,8 +1113,13 @@ * no directory from this directory list can be created. * @throws IOException */ - static DataNode makeInstance(String[] dataDirs, Configuration conf) + static DataNode makeInstance( String[] dataDirs, Configuration conf) throws IOException { + return makeInstance(NetworkTopology.DEFAULT_RACK, dataDirs, conf ); + } + + static DataNode makeInstance(String networkLoc, String[] dataDirs, Configuration conf) + throws IOException { ArrayList dirs = new ArrayList(); for (int i = 0; i < dataDirs.length; i++) { File data = new File(dataDirs[i]); @@ -1113,7 +1130,7 @@ LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() ); } } - return ((dirs.size() > 0) ? new DataNode(conf, dirs.toArray(new String[dirs.size()])) : null); + return ((dirs.size() > 0) ? new DataNode(conf, networkLoc, dirs.toArray(new String[dirs.size()])) : null); } public String toString() { @@ -1124,13 +1141,118 @@ ", xmitsInProgress=" + xmitsInProgress + "}"; } + + /* Get the network location by running a script configured in conf */ + private static String getNetworkLoc( Configuration conf ) + throws IOException { + String locScript = conf.get("dfs.network.script" ); + if( locScript == null ) return null; + LOG.info( "Starting to run script to get datanode network location"); + Process p = Runtime.getRuntime().exec( locScript ); + StringBuffer networkLoc = new StringBuffer(); + final BufferedReader inR = new BufferedReader( + new InputStreamReader(p.getInputStream() ) ); + final BufferedReader errR = new BufferedReader( + new InputStreamReader( p.getErrorStream() ) ); + + // read & log any error messages from the running script + Thread errThread = new Thread() { + public void start() { + try { + String errLine = errR.readLine(); + while(errLine != null) { + LOG.warn("Network script error: "+errLine); + errLine = errR.readLine(); + } + } catch( IOException e) { + + } + } + }; + try { + errThread.start(); + + // fetch output from the process + String line = inR.readLine(); + while( line != null ) { + networkLoc.append( line ); + line = inR.readLine(); + } + try { + // wait for the process to finish + int returnVal = p.waitFor(); + // check the exit code + if( returnVal != 0 ) { + throw new IOException("Process exits with nonzero status: "+locScript); + } + } catch (InterruptedException e) { + throw new IOException( e.getMessage() ); + } finally { + try { + // make sure that the error thread exits + errThread.join(); + } catch (InterruptedException je) { + LOG.warn( StringUtils.stringifyException(je)); + } + } + } finally { + // close in & error streams + try { + inR.close(); + } catch ( IOException ine ) { + throw ine; + } finally { + errR.close(); + } + } + + return networkLoc.toString(); + } + + + /* Get the network location from the command line */ + private static String getNetworkLoc(String args[]) { + for( int i=0; i< args.length; i++ ) { + if ("-r".equals(args[i])||"--rack".equals(args[i]) ) { + if( i==args.length-1 ) { + printUsage(); + } else { + return args[++i]; + } + } + } + return null; + } + + /* Return the datanode's network location + * either from the command line, from script, or a default value + */ + private static String getNetworkLoc(String args[], Configuration conf) + throws IOException { + String networkLoc = getNetworkLoc( args ); + if( networkLoc == null ) { + networkLoc = getNetworkLoc( conf ); + } + if( networkLoc == null ) { + return NetworkTopology.DEFAULT_RACK; + } else { + return NodeBase.normalize( networkLoc ); + } + } + + private static void printUsage() { + System.err.println( + "Usage: java DataNode [-r, --rack ]"); + } + + /** */ public static void main(String args[]) throws IOException { try { Configuration conf = new Configuration(); - runAndWait(conf); + runAndWait(conf, getNetworkLoc(args, conf)); } catch ( Throwable e ) { LOG.error( StringUtils.stringifyException( e ) ); System.exit(-1); Index: src/java/org/apache/hadoop/dfs/NameNode.java =================================================================== --- src/java/org/apache/hadoop/dfs/NameNode.java (revision 502789) +++ src/java/org/apache/hadoop/dfs/NameNode.java (working copy) @@ -539,10 +539,11 @@ //////////////////////////////////////////////////////////////// /** */ - public DatanodeRegistration register( DatanodeRegistration nodeReg + public DatanodeRegistration register( DatanodeRegistration nodeReg, + String networkLocation ) throws IOException { verifyVersion( nodeReg.getVersion() ); - namesystem.registerDatanode( nodeReg ); + namesystem.registerDatanode( nodeReg, networkLocation ); return nodeReg; } Index: src/java/org/apache/hadoop/dfs/DatanodeInfo.java =================================================================== --- src/java/org/apache/hadoop/dfs/DatanodeInfo.java (revision 502789) +++ src/java/org/apache/hadoop/dfs/DatanodeInfo.java (working copy) @@ -23,11 +23,15 @@ import java.util.Date; import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactory; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; /** * DatanodeInfo represents the status of a DataNode. @@ -37,11 +41,12 @@ * @author Mike Cafarella * @author Konstantin Shvachko */ -public class DatanodeInfo extends DatanodeID { +public class DatanodeInfo extends DatanodeID implements Node { protected long capacity; protected long remaining; protected long lastUpdate; protected int xceiverCount; + private String location = NetworkTopology.DEFAULT_RACK; // administrative states of a datanode public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; } @@ -59,18 +64,24 @@ this.remaining = from.getRemaining(); this.lastUpdate = from.getLastUpdate(); this.xceiverCount = from.getXceiverCount(); + this.location = from.getNetworkLocation(); this.adminState = from.adminState; } DatanodeInfo( DatanodeID nodeID ) { - super( nodeID ); - this.capacity = 0L; - this.remaining = 0L; - this.lastUpdate = 0L; - this.xceiverCount = 0; - this.adminState = null; + super( nodeID ); + this.capacity = 0L; + this.remaining = 0L; + this.lastUpdate = 0L; + this.xceiverCount = 0; + this.adminState = null; } + DatanodeInfo( DatanodeID nodeID, String location ) { + this(nodeID); + this.location = location; + } + /** The raw capacity. */ public long getCapacity() { return capacity; } @@ -103,6 +114,18 @@ this.xceiverCount = xceiverCount; } + /** rack name **/ + public String getNetworkLocation() {return location;} + + /** Sets the rack name */ + void setNetworkLocation(String location) { + this.location = NodeBase.normalize(location); + } + + public String getPath() { + return location+NodeBase.PATH_SEPARATOR_STR+name; + } + /** A formatted string for reporting the status of the DataNode. */ public String getDatanodeReport() { StringBuffer buffer = new StringBuffer(); @@ -110,6 +133,9 @@ long r = getRemaining(); long u = c - r; buffer.append("Name: "+name+"\n"); + if(!NetworkTopology.DEFAULT_RACK.equals(location)) { + buffer.append("Rack: "+location+"\n"); + } if (isDecommissioned()) { buffer.append("State : Decommissioned\n"); } else if (isDecommissionInProgress()) { @@ -209,6 +235,7 @@ out.writeLong(remaining); out.writeLong(lastUpdate); out.writeInt(xceiverCount); + Text.writeString( out, location ); WritableUtils.writeEnum(out, getAdminState()); } @@ -220,6 +247,7 @@ this.remaining = in.readLong(); this.lastUpdate = in.readLong(); this.xceiverCount = in.readInt(); + this.location = Text.readString( in ); AdminStates newState = (AdminStates) WritableUtils.readEnum(in, AdminStates.class); setAdminState(newState); Index: src/java/org/apache/hadoop/dfs/DatanodeProtocol.java =================================================================== --- src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (revision 502789) +++ src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (working copy) @@ -31,8 +31,7 @@ * @author Michael Cafarella **********************************************************************/ interface DatanodeProtocol extends VersionedProtocol { - public static final long versionID = 4L; // BlockCommand.action: - // replace DNA_REPORT by DNA_REGISTER + public static final long versionID = 5L; // register takes a new parameter // error code final static int DISK_ERROR = 1; @@ -52,13 +51,14 @@ * Register Datanode. * * @see org.apache.hadoop.dfs.DataNode#register() - * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration) + * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration, String) * * @return updated {@link org.apache.hadoop.dfs.DatanodeRegistration}, which contains * new storageID if the datanode did not have one and * registration ID for further communication. */ - public DatanodeRegistration register( DatanodeRegistration registration + public DatanodeRegistration register( DatanodeRegistration registration, + String networkLocation ) throws IOException; /** * sendHeartbeat() tells the NameNode that the DataNode is still Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java =================================================================== --- src/java/org/apache/hadoop/dfs/FSNamesystem.java (revision 502789) +++ src/java/org/apache/hadoop/dfs/FSNamesystem.java (working copy) @@ -23,11 +23,13 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.util.*; import org.apache.hadoop.mapred.StatusHttpServer; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.fs.Path; import java.io.*; -import java.net.InetSocketAddress; import java.util.*; + import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -189,6 +191,11 @@ private String localMachine; private int port; private SafeModeInfo safeMode; // safe mode information + + // datanode networktoplogy + NetworkTopology clusterMap = new NetworkTopology(); + // for block replicas placement + Replicator replicator = new Replicator(); /** * dirs is a list oif directories where the filesystem directory state @@ -497,10 +504,12 @@ machineSets[i] = new DatanodeDescriptor[0]; } else { machineSets[i] = new DatanodeDescriptor[containingNodes.size()]; - int j = 0; - for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) { - machineSets[i][j] = it.next(); - } + ArrayList containingNodesList = + new ArrayList(containingNodes.size()); + containingNodesList.addAll(containingNodes); + + machineSets[i] = replicator.sortByDistance( + getDatanodeByHost(clientMachine), containingNodesList); } } @@ -671,9 +680,9 @@ } } - // Get the array of replication targets - DatanodeDescriptor targets[] = chooseTargets(replication, null, - clientMachine, blockSize); + // Get the array of replication targets + DatanodeDescriptor targets[] = replicator.chooseTarget(replication, + getDatanodeByHost(clientMachine.toString()), null, blockSize); if (targets.length < this.minReplication) { throw new IOException("failed to create file "+src +" on client " + clientMachine @@ -754,9 +763,13 @@ throw new NotReplicatedYetException("Not replicated yet"); } - // Get the array of replication targets - DatanodeDescriptor targets[] = chooseTargets(pendingFile.getReplication(), - null, pendingFile.getClientMachine(), pendingFile.getBlockSize()); + // Get the array of replication targets + String clientHost = pendingFile.getClientMachine().toString(); + DatanodeDescriptor targets[] = replicator.chooseTarget( + (int)(pendingFile.getReplication()), + getDatanodeByHost(clientHost), + null, + pendingFile.getBlockSize()); if (targets.length < this.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + @@ -1422,7 +1435,8 @@ * @see DataNode#register() * @author Konstantin Shvachko */ - public synchronized void registerDatanode( DatanodeRegistration nodeReg + public synchronized void registerDatanode( DatanodeRegistration nodeReg, + String networkLocation ) throws IOException { NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: " @@ -1434,6 +1448,8 @@ DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() ); if( nodeN != null && nodeN != nodeS ) { + NameNode.LOG.info( "BLOCK* NameSystem.registerDatanode: " + + "node from name: " + nodeN.getName() ); // nodeN previously served a different data storage, // which is not served by anybody anymore. removeDatanode( nodeN ); @@ -1457,11 +1473,15 @@ // data storage, which from now on will be served by a new node. NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " - + "node " + nodeS.name + + "node " + nodeS.getName() + " is replaced by " + nodeReg.getName() + "." ); } getEditLog().logRemoveDatanode( nodeS ); + // update cluster map + clusterMap.remove( nodeS ); nodeS.updateRegInfo( nodeReg ); + nodeS.setNetworkLocation( networkLocation ); + clusterMap.add( nodeS ); getEditLog().logAddDatanode( nodeS ); // also treat the registration message as a heartbeat @@ -1484,7 +1504,8 @@ + "new storageID " + nodeReg.getStorageID() + " assigned." ); } // register new datanode - DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg ); + DatanodeDescriptor nodeDescr + = new DatanodeDescriptor( nodeReg, networkLocation ); unprotectedAddDatanode( nodeDescr ); getEditLog().logAddDatanode( nodeDescr ); @@ -1636,6 +1657,7 @@ removeStoredBlock(it.next(), nodeInfo); } unprotectedRemoveDatanode(nodeInfo); + clusterMap.remove(nodeInfo); } void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) { @@ -1649,6 +1671,7 @@ void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) { datanodeMap.put( nodeDescr.getStorageID(), nodeDescr ); + clusterMap.add(nodeDescr); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.unprotectedAddDatanode: " + "node " + nodeDescr.getName() + " is added to datanodeMap." ); @@ -1661,7 +1684,8 @@ * @param nodeID node */ void wipeDatanode( DatanodeID nodeID ) { - datanodeMap.remove(nodeID.getStorageID()); + String key = nodeID.getStorageID(); + datanodeMap.remove(key); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.wipeDatanode: " + nodeID.getName() + " storage " + nodeID.getStorageID() @@ -2008,6 +2032,7 @@ * Total raw bytes. */ public long totalCapacity() { + synchronized (heartbeats) { return totalCapacity; } @@ -2295,6 +2320,23 @@ } /* + * Filter nodes that are marked for decommison in the given list. + * Return a list of non-decommissioned nodes + */ + private List filterDecommissionedNodes( + Collection nodelist) { + List nonCommissionedNodeList = + new ArrayList(); + for (Iterator it = nodelist.iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { + nonCommissionedNodeList.add(node); + } + } + return nonCommissionedNodeList; + } + /* * Return true if there are any blocks in neededReplication that * reside on the specified node. Otherwise returns false. */ @@ -2395,14 +2437,15 @@ // not be scheduled for removal on that node if (containingNodes != null && containingNodes.contains(srcNode) && (excessBlocks == null || ! excessBlocks.contains(block))) { - // filter out containingNodes that are marked for decommission. - int numCurrentReplica = countContainingNodes(containingNodes); - - DatanodeDescriptor targets[] = chooseTargets( + List nodes = + filterDecommissionedNodes(containingNodes); + int numCurrentReplica = nodes.size(); + DatanodeDescriptor targets[] = replicator.chooseTarget( Math.min( fileINode.getReplication() - numCurrentReplica, - this.maxReplicationStreams - xmitsInProgress), - containingNodes, null, blockSize); + this.maxReplicationStreams - xmitsInProgress), + datanodeMap.get(srcNode.getStorageID()), + nodes, null, blockSize); if (targets.length > 0) { // Build items to return replicateBlocks.add(block); @@ -2471,111 +2514,471 @@ return results; } } - - /** - * Get a certain number of targets, if possible. - * If not, return as many as we can. - * Only live nodes contained in {@link #heartbeats} are - * targeted for replication. - * - * @param desiredReplicates - * number of duplicates wanted. - * @param forbiddenNodes - * of DatanodeDescriptor instances that should not be considered targets. - * @return array of DatanodeDescriptor instances uses as targets. + + /** The class is responsible for choosing the desired number of targets + * for placing block replicas. + * The replica placement strategy is that if the writer is on a datanode, + * the 1st replica is placed on the local machine, + * otherwise a random datanode. The 2nd replica is placed on a datanode + * that is on a different rack. The 3rd replica is placed on a datanode + * which is on the same rack as the first replca. + * @author hairong + * */ - DatanodeDescriptor[] chooseTargets( - int desiredReplicates, - Collection forbiddenNodes, - UTF8 clientMachine, - long blockSize) { - Collection targets = new ArrayList(); + class Replicator { + private class NotEnoughReplicasException extends Exception { + NotEnoughReplicasException( String msg ) { + super( msg ); + } + } + + /** + * choose numOfReplicas data nodes for writer to replicate + * a block with size blocksize + * If not, return as many as we can. + * + * @param numOfReplicas: number of replicas wanted. + * @param writer: the writer's machine, null if not in the cluster. + * @param excludedNodes: datanodesthat should not be considered targets. + * @param blocksize: size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as targets + * and sorted as a pipeline. + */ + DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, + List excludedNodes, + long blocksize ) { + if( excludedNodes == null) { + excludedNodes = new ArrayList(); + } - if (desiredReplicates > heartbeats.size()) { - LOG.warn("Replication requested of "+desiredReplicates - +" is larger than cluster size ("+heartbeats.size() - +"). Using cluster size."); - desiredReplicates = heartbeats.size(); - if (desiredReplicates == 0) { - LOG.warn("While choosing target, totalMachines is " + desiredReplicates); - } + return chooseTarget(numOfReplicas, writer, + new ArrayList(), excludedNodes, blocksize); + } + + /* + * re-replicate numOfReplicas + /** + * choose numOfReplicas data nodes for writer + * to re-replicate a block with size blocksize + * If not, return as many as we can. + * + * @param numOfReplicas: additional number of replicas wanted. + * @param writer: the writer's machine, null if not in the cluster. + * @param choosenNodes: datanodes that have been choosen as targets. + * @param excludedNodes: datanodesthat should not be considered targets. + * @param blocksize: size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ + DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, + List choosenNodes, + List excludedNodes, + long blocksize ) { + if( numOfReplicas == 0 ) + return new DatanodeDescriptor[0]; + + if( excludedNodes == null) { + excludedNodes = new ArrayList(); } - double avgLoad = 0.0; - if (heartbeats.size() != 0) { - avgLoad = (double) totalLoad() / heartbeats.size(); + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = choosenNodes.size()+numOfReplicas; + if( totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; } - // choose local replica first - if (desiredReplicates != 0) { - // make sure that the client machine is not forbidden - if (clientMachine != null && clientMachine.getLength() > 0) { - for (Iterator it = heartbeats.iterator(); - it.hasNext();) { - DatanodeDescriptor node = it.next(); - if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && - clientMachine.toString().equals(node.getHost()) && - !node.isDecommissionInProgress() && !node.isDecommissioned()) { - if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && - (node.getXceiverCount() <= (2.0 * avgLoad))) { - targets.add(node); - desiredReplicates--; - break; - } - } + + int maxNodesPerRack = + (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; + + List results = + new ArrayList(choosenNodes); + excludedNodes.addAll(choosenNodes); + + if(!clusterMap.contains(writer)) + writer=null; + + DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, + clusterMap.getLeaves(NodeBase.ROOT), + excludedNodes, blocksize, maxNodesPerRack, results ); + + results.removeAll(choosenNodes); + + // sorting nodes to form a pipeline + return getPipeline((writer==null)?localNode:writer, results); + } + + /* choose numOfReplicas from clusterNodes */ + private DatanodeDescriptor chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, + DatanodeDescriptor[] clusterNodes, + List excludedNodes, + long blocksize, + int maxNodesPerRack, + List results) { + + if( numOfReplicas == 0 ) return writer; + + int numOfResults = results.size(); + if(writer == null && (numOfResults==1 || numOfResults==2) ) { + writer = results.get(0); + } + + try { + switch( numOfResults ) { + case 0: + writer = chooseLocalNode(writer, clusterNodes, excludedNodes, + blocksize, maxNodesPerRack, results); + if(--numOfReplicas == 0) break; + case 1: + chooseRemoteRack(1, writer, clusterNodes, excludedNodes, + blocksize, maxNodesPerRack, results); + if(--numOfReplicas == 0) break; + case 2: + if(clusterMap.isOnSameRack(results.get(0), results.get(1))) { + chooseRemoteRack(1, writer, clusterNodes, excludedNodes, + blocksize, maxNodesPerRack, results); + } else { + chooseLocalRack(writer, clusterNodes, excludedNodes, + blocksize, maxNodesPerRack, results); } + if(--numOfReplicas == 0) break; + default: + chooseRandom(numOfReplicas, clusterNodes, excludedNodes, + blocksize, maxNodesPerRack, results); } + } catch (NotEnoughReplicasException e) { + LOG.warn("Not be able to place enough replicas, still in need of " + + numOfReplicas ); } - - for (int i = 0; i < desiredReplicates; i++) { - DatanodeDescriptor target = null; - // - // Otherwise, choose node according to target capacity - // - int nNodes = heartbeats.size(); - int idx = r.nextInt(nNodes); - int rejected = 0; - while (target == null && rejected < nNodes ) { - DatanodeDescriptor node = heartbeats.get(idx); - if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && - !targets.contains(node) && - !node.isDecommissionInProgress() && !node.isDecommissioned() && - (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && - (node.getXceiverCount() <= (2.0 * avgLoad))) { - target = node; + return writer; + } + + /* choose localMachine as the target. + * if localMachine is not availabe, + * choose a node on the same rack + * @return the choosen node + */ + private DatanodeDescriptor chooseLocalNode( + DatanodeDescriptor localMachine, + DatanodeDescriptor[] nodes, + List excludedNodes, + long blocksize, + int maxNodesPerRack, + List results) + throws NotEnoughReplicasException { + // if no local machine, randomly choose one node + if(localMachine == null) + return chooseRandom(nodes, excludedNodes, + blocksize, maxNodesPerRack, results); + + // otherwise try local machine first + if(!excludedNodes.contains(localMachine)) { + excludedNodes.add(localMachine); + if( isGoodTarget(localMachine, blocksize, maxNodesPerRack, results)) { + results.add(localMachine); + return localMachine; + } + } + + // try a node on local rack + return chooseLocalRack(localMachine, nodes, excludedNodes, + blocksize, maxNodesPerRack, results); + } + + /* choose one node from the rack that localMachine is on. + * if no such node is availabe, choose one node from the rack where + * a second replica is on. + * if still no such node is available, choose a random node + * in the cluster nodes. + * @return the choosen node + */ + private DatanodeDescriptor chooseLocalRack( + DatanodeDescriptor localMachine, + DatanodeDescriptor[] nodes, + List excludedNodes, + long blocksize, + int maxNodesPerRack, + List results) + throws NotEnoughReplicasException { + // no local machine, so choose a random machine + if( localMachine == null ) { + return chooseRandom(nodes, excludedNodes, + blocksize, maxNodesPerRack, results ); + } + + // choose one from the local rack + try { + return chooseRandom( + clusterMap.getLeaves( localMachine.getNetworkLocation() ), + excludedNodes, blocksize, maxNodesPerRack, results); + } catch (NotEnoughReplicasException e1) { + // find the second replica + DatanodeDescriptor newLocal=null; + for(Iterator iter=results.iterator(); + iter.hasNext();) { + DatanodeDescriptor nextNode = iter.next(); + if(nextNode != localMachine) { + newLocal = nextNode; break; - } else { - idx = (idx+1) % nNodes; - rejected++; } } - if (target == null) { - idx = r.nextInt(nNodes); - rejected = 0; - while (target == null && rejected < nNodes ) { - DatanodeDescriptor node = heartbeats.get(idx); - if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && - !targets.contains(node) && - !node.isDecommissionInProgress() && !node.isDecommissioned() && - node.getRemaining() >= blockSize) { - target = node; - break; - } else { - idx = (idx + 1) % nNodes; - rejected++; - } + if( newLocal != null ) { + try { + return chooseRandom( + clusterMap.getLeaves( newLocal.getNetworkLocation() ), + excludedNodes, blocksize, maxNodesPerRack, results); + } catch( NotEnoughReplicasException e2 ) { + //otherwise randomly choose one from the network + return chooseRandom(nodes, excludedNodes, + blocksize, maxNodesPerRack, results); } + } else { + //otherwise randomly choose one from the network + return chooseRandom(nodes, excludedNodes, + blocksize, maxNodesPerRack, results); } - - if (target == null) { - LOG.warn("Could not find any nodes with sufficient capacity"); - break; // making one more pass over heartbeats would not help + } + } + + /* choose numOfReplicas nodes from the racks + * that localMachine is NOT on. + * if not enough nodes are availabe, choose the remaining ones + * from the local rack + */ + + private void chooseRemoteRack( int numOfReplicas, + DatanodeDescriptor localMachine, + DatanodeDescriptor[] nodes, + List excludedNodes, + long blocksize, + int maxReplicasPerRack, + List results) + throws NotEnoughReplicasException { + // get all the nodes on the local rack + DatanodeDescriptor[] nodesOnRack = clusterMap.getLeaves( + localMachine.getNetworkLocation() ); + + // can we speed up this??? using hashing sets? + DatanodeDescriptor[] nodesOnRemoteRack + = new DatanodeDescriptor[nodes.length-nodesOnRack.length]; + HashSet set1 = new HashSet(nodes.length); + HashSet set2 = new HashSet(nodesOnRack.length); + for(int i=0; inodes. + * @return the choosen node + */ + private DatanodeDescriptor chooseRandom( + DatanodeDescriptor[] nodes, + List excludedNodes, + long blocksize, + int maxNodesPerRack, + List results) + throws NotEnoughReplicasException { + DatanodeDescriptor result; + do { + DatanodeDescriptor[] selectedNodes = + chooseRandom(1, nodes, excludedNodes); + if(selectedNodes.length == 0 ) { + throw new NotEnoughReplicasException( + "Not able to place enough replicas" ); } - targets.add(target); + result = (DatanodeDescriptor)(selectedNodes[0]); + } while( !isGoodTarget( result, blocksize, maxNodesPerRack, results)); + results.add(result); + return result; + } + + /* Randomly choose numOfReplicas targets from nodes. + */ + private void chooseRandom(int numOfReplicas, + DatanodeDescriptor[] nodes, + List excludedNodes, + long blocksize, + int maxNodesPerRack, + List results) + throws NotEnoughReplicasException { + boolean toContinue = true; + do { + DatanodeDescriptor[] selectedNodes = + chooseRandom(numOfReplicas, nodes, excludedNodes); + if(selectedNodes.length < numOfReplicas) { + toContinue = false; + } + for(int i=0; i0 && toContinue ); + + if(numOfReplicas>0) { + throw new NotEnoughReplicasException( + "Not able to place enough replicas"); } + } + + /* Randomly choose one node from nodes. + * @return the choosen node + */ + private DatanodeDescriptor[] chooseRandom(int numOfReplicas, + DatanodeDescriptor[] nodes, + List excludedNodes) { + List results = + new ArrayList(); + int numOfAvailableNodes = 0; + for(int i=0; i 0 ) { + DatanodeDescriptor choosenNode = nodes[r.nextInt(nodes.length)]; + if(!excludedNodes.contains(choosenNode) && + !choosenNode.isDecommissionInProgress() && + !choosenNode.isDecommissioned()) { + results.add( choosenNode ); + excludedNodes.add(choosenNode); + numOfReplicas--; + } + } + return (DatanodeDescriptor[])results.toArray( + new DatanodeDescriptor[results.size()]); + } + + /* judge if a node is a good target. + * return true if node has enough space, + * does not have too much load, and the rack does not have too many nodes + */ + private boolean isGoodTarget( DatanodeDescriptor node, + long blockSize, int maxTargetPerLoc, + List results) { - return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]); - } + // check if the node is (being) decommissed + if(node.isDecommissionInProgress() || node.isDecommissioned()) { + return false; + } + // check the remaining capacity of the target machine + if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) { + return false; + } + + // check the communication traffic of the target machine + double avgLoad = 0; + int size = clusterMap.getNumOfLeaves(); + if( size != 0 ) { + avgLoad = (double)totalLoad()/size; + } + if(node.getXceiverCount() > (2.0 * avgLoad)) { + return false; + } + + // check if the target rack has chosen too many nodes + String rackname = node.getNetworkLocation(); + int counter=1; + for( Iterator iter = results.iterator(); + iter.hasNext(); ) { + DatanodeDescriptor result = iter.next(); + if(rackname.equals(result.getNetworkLocation())) { + counter++; + } + } + if(counter>maxTargetPerLoc) { + return false; + } + return true; + } + + /* Return a pipeline of nodes. + * The pipeline is formed finding a shortest path that + * starts from the writer and tranverses all nodes + * This is basically a traveling salesman problem. + */ + private DatanodeDescriptor[] getPipeline( + DatanodeDescriptor writer, + List nodes ) { + int numOfNodes = nodes.size(); + DatanodeDescriptor[] results = new DatanodeDescriptor[numOfNodes]; + if( numOfNodes==0 ) return results; + + synchronized( clusterMap ) { + int index=0; + if(writer == null || !clusterMap.contains(writer)) { + writer = nodes.get(0); + } + for( ;indexcurrentDistance ) { + shortestDistance = currentDistance; + shortestNode = currentNode; + shortestIndex = i; + } + } + //switch position index & shortestIndex + if( index != shortestIndex ) { + nodes.set(shortestIndex, nodes.get(index)); + nodes.set(index, shortestNode); + } + writer = shortestNode; + } + } + return nodes.toArray( results ); + } + + /** Return datanodes that sorted by their distances to reader + */ + DatanodeDescriptor[] sortByDistance( + final DatanodeDescriptor reader, + List nodes ) { + synchronized(clusterMap) { + if(reader != null && clusterMap.contains(reader)) { + java.util.Collections.sort(nodes, new Comparator() { + public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) { + return clusterMap.getDistance(reader, n1) + -clusterMap.getDistance(reader, n2); + } + }); + } + } + return (DatanodeDescriptor[])nodes.toArray( + new DatanodeDescriptor[nodes.size()]); + } + + } //end of Replicator + + /** * Information about the file while it is being written to. * Note that at that time the file is not visible to the outside. @@ -2664,6 +3067,18 @@ } return null; } + + /* Find data node by its host name. */ + private DatanodeDescriptor getDatanodeByHost( String name ) { + for (Iterator it = datanodeMap.values().iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + if( node.getHost().equals(name) ) + return node; + } + return null; + } + /** Stop at and return the datanode at index (used for content browsing)*/ private DatanodeInfo getDatanodeByIndex( int index ) { int i = 0; @@ -2802,6 +3217,9 @@ "STATE* SafeModeInfo.leave: " + "Safe mode is OFF." ); reached = -1; safeMode = null; + NameNode.stateChangeLog.info("STATE* Network topology has " + +clusterMap.getNumOfRacks()+" racks and " + +clusterMap.getNumOfLeaves()+ " datanodes"); } /** Index: src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java =================================================================== --- src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (revision 502789) +++ src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (working copy) @@ -19,6 +19,9 @@ import java.util.*; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; + /************************************************** * DatanodeDescriptor tracks stats on a given DataNode, * such as available storage capacity, last update time, etc., @@ -32,25 +35,42 @@ * @author Mike Cafarella * @author Konstantin Shvachko **************************************************/ -class DatanodeDescriptor extends DatanodeInfo { +public class DatanodeDescriptor extends DatanodeInfo { private volatile Collection blocks = new TreeSet(); // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist protected boolean isAlive = false; - DatanodeDescriptor() { + /** Default constructor */ + public DatanodeDescriptor() { super(); } - DatanodeDescriptor( DatanodeID nodeID ) { + /** DatanodeDescriptor constructor + * @param nodeID id of the data node + */ + public DatanodeDescriptor( DatanodeID nodeID ) { this( nodeID, 0L, 0L, 0 ); } + + /** DatanodeDescriptor constructor + * + * @param nodeID id of the data node + * @param networkLocation location of the data node in network + */ + public DatanodeDescriptor( DatanodeID nodeID, String networkLocation ) { + this( nodeID, networkLocation, 0L, 0L, 0 ); + } - /** - * Create DatanodeDescriptor. + /** DatanodeDescriptor constructor + * + * @param nodeID id of the data node + * @param capacity capacity of the data node + * @param remaining remaing capacity of the data node + * @param xceiverCount # of data transfers at the data node */ - DatanodeDescriptor( DatanodeID nodeID, + public DatanodeDescriptor( DatanodeID nodeID, long capacity, long remaining, int xceiverCount ) { @@ -58,6 +78,23 @@ updateHeartbeat(capacity, remaining, xceiverCount); } + /** DatanodeDescriptor constructor + * + * @param nodeID id of the data node + * @param networkLocation location of the data node in network + * @param capacity capacity of the data node + * @param remaining remaing capacity of the data node + * @param xceiverCount # of data transfers at the data node + */ + public DatanodeDescriptor( DatanodeID nodeID, + String networkLocation, + long capacity, + long remaining, + int xceiverCount ) { + super( nodeID, networkLocation ); + updateHeartbeat( capacity, remaining, xceiverCount); + } + /** */ void updateBlocks(Block newBlocks[]) {