Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-1073

DFS Scalability: high CPU usage in choosing replication targets and file open



    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.12.1
    • 0.13.0
    • None
    • None


      I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage.

      The positives first:
      1. Datanodes continue to heartbeat and there are no cascading failures.
      2. chooseRandom() does not use much CPU and is very lightweight.

      An analysis of the namenode shows the following:

      1. High CPU usage in FSNamesystem.getPipeline().
      2. Moderate CPU usage in FSNamesystem.sortByDistance().

      The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.

      I have two proposals to address this problem. Please comment.

      Proposal 1: Optimize getDistance()
      In the current implementation, each datanode has a network path associated with it. For example "/default-rack/". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.

      Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent. Thus the distance between the two nodes is 2x where x is the distance to the common parent. If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.

      Also, the calls to check checkArgument() from getDistance() may be removed.
      Also, the call to getPipeline() may be done outside the global FSNamesystem lock.

      Proposal 2: Distribute the workload to the DFSClient
      The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.

      If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.


        1. getDistance.patch
          14 kB
          Hairong Kuang
        2. getDistance2.patch
          14 kB
          Hairong Kuang

        Issue Links



              hairong Hairong Kuang
              dhruba Dhruba Borthakur
              0 Vote for this issue
              0 Start watching this issue