Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml	(revision 651126)
+++ conf/hadoop-default.xml	(working copy)
@@ -262,6 +262,12 @@
 </property>
 
 <property>
+  <name>dfs.datanode.handler.count</name>
+  <value>3</value>
+  <description>The number of server threads for the datanode.</description>
+</property>
+
+<property>
   <name>dfs.http.address</name>
   <value>0.0.0.0:50070</value>
   <description>
Index: src/test/org/apache/hadoop/net/TestNetworkTopology.java
===================================================================
--- src/test/org/apache/hadoop/net/TestNetworkTopology.java	(revision 651126)
+++ src/test/org/apache/hadoop/net/TestNetworkTopology.java	(working copy)
@@ -30,16 +30,16 @@
 public class TestNetworkTopology extends TestCase {
   private final static NetworkTopology cluster = new NetworkTopology();
   private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
-    new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3"),
-    new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r3")
+    new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3"),
+    new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r3")
   };
   private final static DatanodeDescriptor NODE = 
-    new DatanodeDescriptor(new DatanodeID("h8:5020", "0", -1), "/d2/r4");
+    new DatanodeDescriptor(new DatanodeID("h8:5020"), "/d2/r4");
   
   static {
     for(int i=0; i<dataNodes.length; i++) {
Index: src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(revision 0)
+++ src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(revision 0)
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * This tests InterDataNodeProtocol for block handling. 
+ */
+public class TestInterDatanodeProtocol extends junit.framework.TestCase {
+  public void testGetBlockMetaDataInfo() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filepath = "/foo";
+      DFSTestUtil.createFile(dfs, new Path(filepath), 1024L, (short)3, 0L);
+      assertTrue(dfs.dfs.exists(filepath));
+
+      //get block info
+      NameNode namenode = cluster.getNameNode();
+      LocatedBlocks locations = namenode.getBlockLocations(
+          filepath, 0, Long.MAX_VALUE);
+      List<LocatedBlock> blocks = locations.getLocatedBlocks();
+      assertTrue(blocks.size() > 0);
+
+      LocatedBlock locatedblock = blocks.get(0);
+      DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+      assertTrue(datanodeinfo.length > 0);
+
+      //connect to a data node
+      InetSocketAddress addr = NetUtils.createSocketAddr(
+          datanodeinfo[0].getHost() + ":" + datanodeinfo[0].getIpcPort());
+      InterDatanodeProtocol.LOG.info("addr=" + addr);
+      InterDatanodeProtocol idp = (InterDatanodeProtocol)RPC.waitForProxy(
+          InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
+          addr, conf);
+
+      DataNode datanode = null;
+      for(DataNode dn : cluster.getDataNodes()) {
+        if (dn.ipcServer.getListenerAddress().getPort()
+            == datanodeinfo[0].getIpcPort()) {
+          datanode = dn;
+        }
+      }
+      assertTrue(datanode != null);
+      
+      //stop block scanner, so we could compare lastScanTime
+      datanode.blockScannerThread.interrupt();
+
+      //verify BlockMetaDataInfo
+      Block b = locatedblock.getBlock();
+      BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b.blkid);
+      assertEquals(b.getBlockId(), metainfo.getBlockId());
+      assertEquals(b.getNumBytes(), metainfo.getNumBytes());
+      assertEquals(datanode.blockScanner.getLastScanTime(b),
+          metainfo.getLastScanTime());
+
+      //TODO: verify GenerationStamp
+      InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
+          + idp.updateGenerationStamp(b.blkid, new GenerationStamp(456789L)));
+    }
+    finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+    
+  }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java	(revision 651126)
+++ src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java	(working copy)
@@ -23,14 +23,14 @@
 public class TestHost2NodesMap extends TestCase {
   static private Host2NodesMap map = new Host2NodesMap();
   private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
-    new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h3:5030", "0", -1), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h3:5030"), "/d1/r2"),
   };
   private final static DatanodeDescriptor NULL_NODE = null; 
   private final static DatanodeDescriptor NODE = 
-    new DatanodeDescriptor(new DatanodeID("h3:5040", "0", -1), "/d1/r4");
+    new DatanodeDescriptor(new DatanodeID("h3:5040"), "/d1/r4");
 
   static {
     for(DatanodeDescriptor node:dataNodes) {
Index: src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java	(revision 651126)
+++ src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java	(working copy)
@@ -38,16 +38,16 @@
   private static ReplicationTargetChooser replicator;
   private static DatanodeDescriptor dataNodes[] = 
     new DatanodeDescriptor[] {
-      new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
-      new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
-      new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
-      new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
-      new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d2/r3"),
-      new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3")
+      new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+      new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+      new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d2/r3"),
+      new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3")
     };
    
   private final static DatanodeDescriptor NODE = 
-    new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r4");
+    new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r4");
   
   static {
     try {
Index: src/java/org/apache/hadoop/dfs/DataBlockScanner.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataBlockScanner.java	(revision 651126)
+++ src/java/org/apache/hadoop/dfs/DataBlockScanner.java	(working copy)
@@ -252,7 +252,13 @@
       delBlockInfo(info);
     }
   }
-  
+
+  /** @return the last scan time */
+  synchronized long getLastScanTime(Block block) {
+    BlockScanInfo info = blockMap.get(block);
+    return info == null? 0: info.lastScanTime;
+  }
+
   /** Deletes blocks from internal structures */
   void deleteBlocks(Block[] blocks) {
     for ( Block b : blocks ) {
Index: src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java	(revision 0)
+++ src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java	(revision 0)
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+
+import org.apache.hadoop.io.*;
+
+/**
+ * Meta data information for a block
+ */
+class BlockMetaDataInfo extends Block {
+  static final WritableFactory FACTORY = new WritableFactory() {
+    public Writable newInstance() { return new BlockMetaDataInfo(); }
+  };
+  static {                                      // register a ctor
+    WritableFactories.setFactory(BlockMetaDataInfo.class, FACTORY);
+  }
+
+  /** get BlockMetaDataInfo from the data set and the block scanner */
+  static BlockMetaDataInfo getBlockMetaDataInfo(Block b,
+      FSDatasetInterface dataset, DataBlockScanner blockscanner
+      ) throws IOException {
+    BlockMetaDataInfo info = new BlockMetaDataInfo();
+    info.blkid = b.getBlockId();
+    info.lastScanTime = blockscanner.getLastScanTime(b);
+    info.len = dataset.getLength(b);
+    //TODO: get generation stamp here
+    return info;
+  }
+
+  //TODO: remove generationStamp if it is defined in Block
+  private long generationStamp;
+  private long lastScanTime;
+
+  public BlockMetaDataInfo() {}
+
+  long getLastScanTime() {return lastScanTime;}
+
+  long getGenerationStamp() {return generationStamp;}
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeLong(generationStamp);
+    out.writeLong(lastScanTime);
+  }
+
+  /** {@inheritDoc} */
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    generationStamp = in.readLong();
+    lastScanTime = in.readLong();
+  }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeRegistration.java	(revision 651126)
+++ src/java/org/apache/hadoop/dfs/DatanodeRegistration.java	(working copy)
@@ -47,15 +47,14 @@
    * Default constructor.
    */
   public DatanodeRegistration() {
-    super(null, null, -1);
-    this.storageInfo = new StorageInfo();
+    this("");
   }
   
   /**
    * Create DatanodeRegistration
    */
   public DatanodeRegistration(String nodeName) {
-    super(nodeName, "", -1);
+    super(nodeName);
     this.storageInfo = new StorageInfo();
   }
   
@@ -63,6 +62,10 @@
     this.infoPort = infoPort;
   }
   
+  void setIpcPort(int ipcPort) {
+    this.ipcPort = ipcPort;
+  }
+
   void setStorageInfo(DataStorage storage) {
     this.storageInfo = new StorageInfo(storage);
     this.storageID = storage.getStorageID();
@@ -84,6 +87,13 @@
     return Storage.getRegistrationID(storageInfo);
   }
 
+  public String toString() {
+    return getClass().getSimpleName()
+      + "(" + name
+      + ", storageID=" + storageID
+      + ", infoPort=" + infoPort
+      + ", ipcPort=" + ipcPort;
+  }
   /////////////////////////////////////////////////
   // Writable
   /////////////////////////////////////////////////
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java	(revision 651126)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -2035,7 +2035,8 @@
     // update the datanode's name with ip:port
     DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
                                       nodeReg.getStorageID(),
-                                      nodeReg.getInfoPort());
+                                      nodeReg.getInfoPort(),
+                                      nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
       
     NameNode.stateChangeLog.info(
@@ -3252,7 +3253,7 @@
     if (listDeadNodes) {
       for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
         DatanodeDescriptor dn = 
-            new DatanodeDescriptor(new DatanodeID(it.next(), "", 0));
+            new DatanodeDescriptor(new DatanodeID(it.next(), "", 0, 0));
         dn.setLastUpdate(0);
         nodes.add(dn);
       }
Index: src/java/org/apache/hadoop/dfs/DatanodeID.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeID.java	(revision 651126)
+++ src/java/org/apache/hadoop/dfs/DatanodeID.java	(working copy)
@@ -36,12 +36,13 @@
   protected String name;      /// hostname:portNumber
   protected String storageID; /// unique per cluster storageID
   protected int infoPort;     /// the port where the infoserver is running
+  protected int ipcPort;     /// the port where the ipc server is running
 
   /**
    * DatanodeID default constructor
    */
   public DatanodeID() {
-    this("", "", -1);
+    this("");
   }
 
   /**
@@ -50,19 +51,26 @@
    * @param from
    */
   public DatanodeID(DatanodeID from) {
-    this(from.getName(), from.getStorageID(), from.getInfoPort());
+    this(from.getName(), from.getStorageID(),
+        from.getInfoPort(), from.getIpcPort());
   }
   
+  public DatanodeID(String nodeName) {
+    this(nodeName, "", -1, -1);
+  }
+
   /**
    * Create DatanodeID
    * 
    * @param nodeName (hostname:portNumber) 
    * @param storageID data storage ID
    */
-  public DatanodeID(String nodeName, String storageID, int infoPort) {
+  public DatanodeID(String nodeName, String storageID,
+      int infoPort, int ipcPort) {
     this.name = nodeName;
     this.storageID = storageID;
     this.infoPort = infoPort;
+    this.ipcPort = ipcPort;
   }
   
   /**
@@ -87,6 +95,13 @@
   }
 
   /**
+   * @return ipcPort (the port at which the IPC server bound to)
+   */
+  public int getIpcPort() {
+    return ipcPort;
+  }
+
+  /**
    * @sets data storage ID.
    */
   void setStorageID(String storageID) {
@@ -160,6 +175,7 @@
     UTF8.writeString(out, name);
     UTF8.writeString(out, storageID);
     out.writeShort(infoPort);
+    out.writeShort(ipcPort);
   }
 
   /**
@@ -172,5 +188,6 @@
     // So chop off the first two bytes (and hence the signed bits) before 
     // setting the field.
     this.infoPort = in.readShort() & 0x0000ffff;
+    this.ipcPort = in.readShort() & 0x0000ffff;
   }
 }
Index: src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(revision 0)
+++ src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(revision 0)
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/** An inter-datanode protocol for updating generation stamp
+ */
+interface InterDatanodeProtocol extends VersionedProtocol {
+  public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
+
+  /**
+   * 1: added getBlockMetaDataInfo and updateGenerationStamp
+   */
+  public static final long versionID = 1L;
+
+  /** @return the BlockMetaDataInfo of a block */
+  BlockMetaDataInfo getBlockMetaDataInfo(long blockID) throws IOException;
+
+  /**
+   * Update the GenerationStamp of a block
+   * @return true iff update was required and done successfully 
+   */
+  boolean updateGenerationStamp(long blockID, GenerationStamp generationstamp
+      ) throws IOException;
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 651126)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -77,7 +77,7 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode implements FSConstants, Runnable {
+public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
@@ -121,8 +121,8 @@
   private int socketTimeout;
   private int socketWriteTimeout = 0;  
   
-  private DataBlockScanner blockScanner;
-  private Daemon blockScannerThread;
+  DataBlockScanner blockScanner;
+  Daemon blockScannerThread;
   
   private static final Random R = new Random();
 
@@ -142,6 +142,9 @@
   long balanceBandwidth;
   private Throttler balancingThrottler;
 
+  // For InterDataNodeProtocol
+  Server ipcServer;
+  
   // Record all sockets opend for data transfer
   Map<Socket, Socket> childSockets = Collections.synchronizedMap(
                                        new HashMap<Socket, Socket>());
@@ -268,7 +271,7 @@
     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                      tmpPort);
     this.dnRegistration.setName(machineName + ":" + tmpPort);
-    LOG.info("Opened server at " + tmpPort);
+    LOG.info("Opened info server at " + tmpPort);
       
     this.threadGroup = new ThreadGroup("dataXceiveServer");
     this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
@@ -334,6 +337,14 @@
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+    
+    //init ipc server
+    ipcServer = RPC.getServer(this, "0.0.0.0", 0, 
+        conf.getInt("dfs.datanode.handler.count", 3), false, conf);
+    ipcServer.start();
+    dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
+
+    LOG.info("dnRegistration = " + dnRegistration);
   }
 
   /**
@@ -491,6 +502,9 @@
       } catch (Exception e) {
       }
     }
+    if (ipcServer != null) {
+      ipcServer.stop();
+    }
     this.shouldRun = false;
     if (dataXceiveServer != null) {
       ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
@@ -2855,4 +2869,34 @@
       System.exit(-1);
     }
   }
+
+  // InterDataNodeProtocol implementation
+  /** {@inheritDoc} */
+  public BlockMetaDataInfo getBlockMetaDataInfo(long blockID
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("blockID=" + blockID);
+    }
+    return BlockMetaDataInfo.getBlockMetaDataInfo(
+        new Block(blockID, -1), data, blockScanner);
+  }
+
+  /** {@inheritDoc} */
+  public boolean updateGenerationStamp(long blockID, GenerationStamp generationstamp) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("blockID=" + blockID + ", generationstamp=" + generationstamp);
+    }
+    //TODO: update generation stamp here
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public long getProtocolVersion(String protocol, long clientVersion
+      ) throws IOException {
+    if (protocol.equals(InterDatanodeProtocol.class.getName())) {
+      return InterDatanodeProtocol.versionID; 
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
 }
Index: src/java/org/apache/hadoop/dfs/FSImage.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSImage.java	(revision 651126)
+++ src/java/org/apache/hadoop/dfs/FSImage.java	(working copy)
@@ -1253,9 +1253,7 @@
      * Datanode to be stored in the fsImage.
      */
     public void write(DataOutput out) throws IOException {
-      DatanodeID id = new DatanodeID(node.getName(), node.getStorageID(),
-                                     node.getInfoPort());
-      id.write(out);
+      new DatanodeID(node).write(out);
       out.writeLong(node.getCapacity());
       out.writeLong(node.getRemaining());
       out.writeLong(node.getLastUpdate());
