Index: src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(revision 659757)
+++ src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(working copy)
@@ -21,13 +21,40 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 
 /**
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol extends junit.framework.TestCase {
-  public void testGetBlockMetaDataInfo() throws IOException {
+  static void checkMetaInfo(Block b, InterDatanodeProtocol idp,
+      DataBlockScanner scanner) throws IOException {
+    BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
+    assertEquals(b.getBlockId(), metainfo.getBlockId());
+    assertEquals(b.getNumBytes(), metainfo.getNumBytes());
+    if (scanner != null) {
+      assertEquals(scanner.getLastScanTime(b),
+          metainfo.getLastScanTime());
+    }
+  }
+
+  static LocatedBlock getLastLocatedBlock(ClientProtocol namenode, String src
+      ) throws IOException {
+    //get block info for the last block
+    LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
+    List<LocatedBlock> blocks = locations.getLocatedBlocks();
+    DataNode.LOG.info("blocks.size()=" + blocks.size());
+    assertTrue(blocks.size() > 0);
+
+    return blocks.get(blocks.size() - 1);
+  }
+
+  /**
+   * The following test first creates a file.
+   * It verifies the block information from a datanode.
+   * Then, it updates the block with new information and verifies again. 
+   */
+  public void testBlockMetaDataInfo() throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;
 
@@ -37,18 +64,13 @@
 
       //create a file
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-      String filepath = "/foo";
-      DFSTestUtil.createFile(dfs, new Path(filepath), 1024L, (short)3, 0L);
-      assertTrue(dfs.dfs.exists(filepath));
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+      assertTrue(dfs.dfs.exists(filestr));
 
       //get block info
-      ClientProtocol namenode = dfs.dfs.namenode;
-      LocatedBlocks locations = namenode.getBlockLocations(
-          filepath, 0, Long.MAX_VALUE);
-      List<LocatedBlock> blocks = locations.getLocatedBlocks();
-      assertTrue(blocks.size() > 0);
-
-      LocatedBlock locatedblock = blocks.get(0);
+      LocatedBlock locatedblock = getLastLocatedBlock(dfs.dfs.namenode, filestr);
       DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
       assertTrue(datanodeinfo.length > 0);
 
@@ -64,15 +86,13 @@
       //verify BlockMetaDataInfo
       Block b = locatedblock.getBlock();
       InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
-      BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
-      assertEquals(b.getBlockId(), metainfo.getBlockId());
-      assertEquals(b.getNumBytes(), metainfo.getNumBytes());
-      assertEquals(datanode.blockScanner.getLastScanTime(b),
-          metainfo.getLastScanTime());
+      checkMetaInfo(b, idp, datanode.blockScanner);
 
-      //TODO: verify GenerationStamp
-      InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
-          + idp.updateGenerationStamp(b, new GenerationStamp(456789L)));
+      //verify updateBlock
+      Block newblock = new Block(
+          b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
+      idp.updateBlock(b, newblock);
+      checkMetaInfo(newblock, idp, datanode.blockScanner);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}
Index: src/test/org/apache/hadoop/dfs/TestFileCreation.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestFileCreation.java	(revision 659757)
+++ src/test/org/apache/hadoop/dfs/TestFileCreation.java	(working copy)
@@ -22,6 +22,9 @@
 import java.net.*;
 import java.util.Random;
 
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,6 +39,14 @@
  * data can be read by another client.
  */
 public class TestFileCreation extends TestCase {
+  static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/";
+
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int numBlocks = 2;
@@ -46,10 +57,8 @@
   // entire file is written, the first two blocks definitely get flushed to
   // the datanodes.
 
-  //
   // creates a file but does not close it
-  //
-  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
     throws IOException {
     FSDataOutputStream stm = fileSys.create(name, true,
                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
@@ -151,7 +160,6 @@
       //
       Path path = new Path("/");
       System.out.println("Path : \"" + path.toString() + "\"");
-      System.out.println(fs.isDirectory(path));
       System.out.println(fs.getFileStatus(path).isDir()); 
       assertTrue("/ should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
@@ -230,6 +238,7 @@
       writeFile(stm1);
       writeFile(stm3);
       stm1.close();
+      stm2.close();
       stm3.close();
 
       // set delete on exit flag on files.
@@ -257,12 +266,6 @@
       System.out.println("DeleteOnExit successful.");
 
     } finally {
-      if (fs != null) {
-        fs.close();
-      }
-      if (localfs != null) {
-        localfs.close();
-      }
       cluster.shutdown();
     }
   }
@@ -358,18 +361,15 @@
     }
     // create cluster
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fs = cluster.getFileSystem();
-    cluster.waitActive();
-    InetSocketAddress addr = new InetSocketAddress("localhost",
-                                                   cluster.getNameNodePort());
-    DFSClient client = new DFSClient(addr, conf);
-
     try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      DFSClient client = dfs.dfs;
 
       // create a new file.
       //
       Path file1 = new Path("/filestatus.dat");
-      createFile(fs, file1, 1);
+      createFile(dfs, file1, 1);
       System.out.println("testFileCreationError2: "
                          + "Created file filestatus.dat with one "
                          + " replicas.");
@@ -395,7 +395,7 @@
 
       // wait for the lease to expire
       try {
-        Thread.sleep(5 * leasePeriod);
+        Thread.sleep(10 * leasePeriod);
       } catch (InterruptedException e) {
       }
 
@@ -403,16 +403,11 @@
       locations = client.namenode.getBlockLocations(file1.toString(), 
                                                     0, Long.MAX_VALUE);
       System.out.println("locations = " + locations.locatedBlockCount());
-      assertTrue("Error blocks were not cleaned up",
-                 locations.locatedBlockCount() == 0);
+      assertEquals("Error blocks were not cleaned up",
+                 0, locations.locatedBlockCount());
       System.out.println("testFileCreationError2 successful");
     } finally {
-      try {
-        fs.close();
-      } catch (Exception e) {
-      }
       cluster.shutdown();
-      client.close();
     }
   }
 
@@ -430,18 +425,15 @@
     if (simulatedStorage) {
       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
     }
+
     // create cluster
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fs = cluster.getFileSystem();
-    cluster.waitActive();
-    int nnport = cluster.getNameNodePort();
-    InetSocketAddress addr = new InetSocketAddress("localhost", nnport);
-
-    DFSClient client = null;
     try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      final int nnport = cluster.getNameNodePort();
 
       // create a new file.
-      //
       Path file1 = new Path("/filestatus.dat");
       FSDataOutputStream stm = createFile(fs, file1, 1);
       System.out.println("testFileCreationNamenodeRestart: "
@@ -510,6 +502,7 @@
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
                                    null, null, null);
       cluster.waitActive();
+      fs = cluster.getFileSystem();
 
       // instruct the dfsclient to use a new filename when it requests
       // new blocks for files that were renamed.
@@ -534,7 +527,7 @@
       stm4.close();
 
       // verify that new block is associated with this file
-      client = new DFSClient(addr, conf);
+      DFSClient client = ((DistributedFileSystem)fs).dfs;
       LocatedBlocks locations = client.namenode.getBlockLocations(
                                   file1.toString(), 0, Long.MAX_VALUE);
       System.out.println("locations = " + locations.locatedBlockCount());
@@ -548,9 +541,7 @@
       assertTrue("Error blocks were not cleaned up for file " + file2,
                  locations.locatedBlockCount() == 1);
     } finally {
-      fs.close();
       cluster.shutdown();
-      if (client != null)  client.close();
     }
   }
 
@@ -609,4 +600,82 @@
     simulatedStorage = false;
   }
 
+  /**
+   * Test creating two files at the same time. 
+   */
+  public void testConcurrentFileCreation() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      
+      Path[] p = {new Path("/foo"), new Path("/bar")};
+      
+      //write 2 files at the same time
+      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
+      int i = 0;
+      for(; i < 100; i++) {
+        out[0].write(i);
+        out[1].write(i);
+      }
+      out[0].close();
+      for(; i < 200; i++) {out[1].write(i);}
+      out[1].close();
+
+      //verify
+      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
+      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
+      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
+  /**
+   * Test lease expires.
+   */
+  public void testLeaseExpireHardLimit() throws Exception {
+    System.out.println("testLeaseExpire start");
+    final long leasePeriod = 1000;
+
+    Configuration conf = new Configuration();
+    conf.setInt("heartbeat.recheck.interval", 1000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      final String f = DIR + "foo";
+      final Path fpath = new Path(f);
+      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, 3);
+      out.write("something".getBytes());
+      ((DFSClient.DFSOutputStream)out.getWrappedStream()).fsync();
+      System.out.println("testLeaseExpire: Created " + f + " with 1 replica.");
+      out.close();
+/*
+      // set the soft and hard limit to be 1 second so that the
+      // namenode triggers lease recovery
+      cluster.setLeasePeriod(leasePeriod, leasePeriod);
+      // wait for the lease to expire
+      try {Thread.sleep(30 * leasePeriod);} catch (InterruptedException e) {}
+*/
+      //TODO: what should we check here?
+      //try open
+      System.out.println("open file.");
+      FSDataInputStream in = dfs.open(fpath);
+      byte[] buffer = new byte[1024];
+      int n = in.read(buffer);
+      assertEquals("something", new String(buffer, 0, n));
+      in.close();
+    } finally {
+      cluster.shutdown();
+    }
+
+    System.out.println("testLeaseExpire successful");
+  }
 }
Index: src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
===================================================================
--- src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java	(revision 659757)
+++ src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java	(working copy)
@@ -97,6 +97,15 @@
         oStream = null;
       }
     }
+
+    synchronized long getGenerationStamp() {
+      return theBlock.getGenerationStamp();
+    }
+
+    synchronized void updateBlock(Block b) {
+      theBlock.generationStamp = b.generationStamp;
+      setlength(b.len);
+    }
     
     synchronized long getlength() {
       if (!finalized) {
@@ -292,6 +301,24 @@
     return binfo.getlength();
   }
 
+  /** {@inheritDoc} */
+  public long getStoredGenerationStamp(Block b) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("BInfo not found, b=" + b);
+    }
+    return binfo.getGenerationStamp();
+  }
+
+  /** {@inheritDoc} */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
+    BInfo binfo = blockMap.get(newblock);
+    if (binfo == null) {
+      throw new IOException("BInfo not found, b=" + newblock);
+    }
+    binfo.updateBlock(newblock);
+  }
+
   public synchronized void invalidate(Block[] invalidBlks) throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
@@ -586,5 +613,4 @@
   public String getStorageInfo() {
     return "Simulated FSDataset-" + storageId;
   }
-
 }
Index: src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java	(revision 0)
+++ src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java	(revision 0)
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class TestLeaseRecovery extends junit.framework.TestCase {
+  static final int BLOCK_SIZE = 1024;
+  static final short REPLICATION_NUM = (short)3;
+  static final Random RANDOM = new Random();
+
+  static void checkMetaInfo(Block b, InterDatanodeProtocol idp
+      ) throws IOException {
+    TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
+  }
+  
+  static int min(Integer... x) {
+    int m = x[0];
+    for(int i = 1; i < x.length; i++) {
+      if (x[i] < m) {
+        m = x[i];
+      }
+    }
+    return m;
+  }
+
+  /**
+   * The following test first creates a file with a few blocks.
+   * It randomly truncates the replica of the last block stored in each datanode.
+   * Finally, it triggers block synchronization to synchronize all stored block.
+   */
+  public void testBlockSynchronization() throws Exception {
+    final int ORG_FILE_SIZE = 3000; 
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 5, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, ORG_FILE_SIZE, REPLICATION_NUM, 0L);
+      assertTrue(dfs.dfs.exists(filestr));
+
+      //get block info for the last block
+      LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(
+          dfs.dfs.namenode, filestr);
+      DatanodeInfo[] datanodeinfos = locatedblock.getLocations();
+      assertEquals(REPLICATION_NUM, datanodeinfos.length);
+
+      //connect to data nodes
+      InterDatanodeProtocol[] idps = new InterDatanodeProtocol[REPLICATION_NUM];
+      DataNode[] datanodes = new DataNode[REPLICATION_NUM];
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        idps[i] = DataNode.createInterDataNodeProtocolProxy(datanodeinfos[i], conf);
+        datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort());
+        assertTrue(datanodes[i] != null);
+      }
+      
+      //verify BlockMetaDataInfo
+      Block lastblock = locatedblock.getBlock();
+      DataNode.LOG.info("newblocks=" + lastblock);
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        checkMetaInfo(lastblock, idps[i]);
+      }
+
+      //setup random block sizes 
+      int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE;
+      Integer[] newblocksizes = new Integer[REPLICATION_NUM];
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        newblocksizes[i] = RANDOM.nextInt(lastblocksize);
+      }
+      DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); 
+
+      //update blocks with random block sizes
+      Block[] newblocks = new Block[REPLICATION_NUM];
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
+            lastblock.getGenerationStamp());
+        idps[i].updateBlock(lastblock, newblocks[i]);
+        checkMetaInfo(newblocks[i], idps[i]);
+      }
+
+      DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
+      dfs.dfs.namenode.append(filestr, dfs.dfs.clientName);
+
+      //block synchronization
+      final int primarydatanodeindex = RANDOM.nextInt(datanodes.length);
+      DataNode.LOG.info("primarydatanodeindex  =" + primarydatanodeindex);
+      DataNode primary = datanodes[primarydatanodeindex];
+      DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration);
+      primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join();
+
+      BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM];
+      int minsize = min(newblocksizes);
+      long currentGS = cluster.getNameNode().namesystem.getGenerationStamp();
+      lastblock.generationStamp = currentGS;
+      for(int i = 0; i < REPLICATION_NUM; i++) {
+        updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
+        assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
+        assertEquals(minsize, updatedmetainfo[i].getNumBytes());
+        assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
+      }
+    }
+    finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java	(working copy)
@@ -124,4 +124,9 @@
     setDetached();
     return true;
   }
+  
+  public String toString() {
+    return getClass().getSimpleName() + "(volume=" + volume
+        + ", file=" + file + ", detached=" + detached + ")";
+  }
 }
Index: src/java/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DFSClient.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/DFSClient.java	(working copy)
@@ -128,6 +128,18 @@
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
         rpcNamenode, methodNameToPolicyMap);
   }
+
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
+      DatanodeID datanodeid, Configuration conf) throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+      datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+    }
+    return (ClientDatanodeProtocol)RPC.waitForProxy(ClientDatanodeProtocol.class
+,
+        ClientDatanodeProtocol.versionID, addr, conf);
+  }
         
   /** 
    * Create a new DFSClient connected to the default namenode.
@@ -2122,9 +2134,25 @@
         for (int i = errorIndex; i < (nodes.length-1); i++) {
           newnodes[i] = nodes[i+1];
         }
-        nodes = newnodes;
 
+        // Tell the primary datanode to do error recovery 
+        // by stamping appropriate generation stamps. XXX close datanodeRPC?
+        boolean status = false;
+        try {
+          ClientDatanodeProtocol datanodeRPC = 
+            createClientDatanodeProtocolProxy(newnodes[0], conf);
+          status = datanodeRPC.recoverBlock(block, newnodes);
+        } catch (IOException e) {
+          status = false;
+        }
+        if (!status) {
+          LOG.warn("Error Recovery for block " + block + " failed " +
+                   " because recovery from primary datanode " +
+                   newnodes[0] + " failed.");
+        }
+
         // setup new pipeline
+        nodes = newnodes;
         hasError = false;
         errorIndex = 0;
         success = createBlockOutputStream(nodes, src, true);
Index: src/java/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNode.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/NameNode.java	(working copy)
@@ -288,6 +288,20 @@
     myMetrics.numFilesCreated.inc();
   }
 
+  /** {@inheritDoc} */
+  public void append(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.append: file "
+          +src+" for "+clientName+" at "+clientMachine);
+    }
+    //TODO: add namesystem.appendFile(...), which calls appendFileInternal(...)
+    namesystem.appendFileInternal(src, clientName, clientMachine);
+
+    //TODO: inc myMetrics;
+  }
+
+  /** {@inheritDoc} */
   public boolean setReplication(String src, 
                                 short replication
                                 ) throws IOException {
@@ -358,6 +372,20 @@
     }
   }
 
+  /** {@inheritDoc} */
+  public long nextGenerationStamp() {
+    return namesystem.nextGenerationStamp();
+  }
+
+  /** {@inheritDoc} */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, boolean closeFile,
+      DatanodeID[] newtargets
+      ) throws IOException {
+    namesystem.commitBlockSynchronization(block,
+        newgenerationstamp, newlength, closeFile, newtargets);
+  }
+  
   public long getPreferredBlockSize(String filename) throws IOException {
     return namesystem.getPreferredBlockSize(filename);
   }
Index: src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDatasetInterface.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSDatasetInterface.java	(working copy)
@@ -89,8 +89,13 @@
    * @throws IOException
    */
   public long getLength(Block b) throws IOException;
-     
+
   /**
+   * @return the generation stamp stored with the block.
+   */
+  public long getStoredGenerationStamp(Block b) throws IOException;
+
+  /**
    * Returns an input stream to read the contents of the specified block
    * @param b
    * @return an input stream to read the contents of the specified block
@@ -136,6 +141,11 @@
   public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
 
   /**
+   * Update the block to the new generation stamp and length.  
+   */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException;
+  
+  /**
    * Finalizes the block previously opened for writing using writeToBlock.
    * The block size is what is in the parameter b and it must match the amount
    *  of data written
Index: src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java	(revision 0)
+++ src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java	(revision 0)
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/** An inter-datanode protocol for updating generation stamp
+ */
+interface ClientDatanodeProtocol extends VersionedProtocol {
+  public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
+
+  /**
+   * 1: start of time
+   */
+  public static final long versionID = 1L;
+
+  /** Start generation-stamp recovery for specified block
+   * @param block the specified block
+   * @param DatanodeInfo the list of possible locations of specified block
+   * @return true if recovery successful, otherwise return false
+   * @throws IOException
+   */
+  boolean recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
+}
Index: src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeProtocol.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/DatanodeProtocol.java	(working copy)
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 14: add corrupt field to LocatedBlock
+   * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and commitBlockSynchronization
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   // error code
   final static int NOTIFY = 0;
@@ -51,6 +51,7 @@
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_BLOCKREPORT = 6;   // request a block report
+  final static int DNA_RECOVERBLOCK = 7;  // request a block recovery
 
   /** 
    * Register Datanode.
@@ -132,4 +133,17 @@
    * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
+  
+  /**
+   * @return the next GenerationStamp
+   */
+  public long nextGenerationStamp() throws IOException;
+
+  /**
+   * Commit block synchronization in lease recovery
+   */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, boolean closeFile,
+      DatanodeID[] newtargets
+      ) throws IOException;
 }
Index: src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java	(working copy)
@@ -38,9 +38,10 @@
       ) throws IOException {
     BlockMetaDataInfo info = new BlockMetaDataInfo();
     info.blkid = b.getBlockId();
-    info.lastScanTime = blockscanner.getLastScanTime(b);
+    info.generationStamp = dataset.getStoredGenerationStamp(b);
+    b.generationStamp = info.generationStamp; 
     info.len = dataset.getLength(b);
-    //TODO: get generation stamp here
+    info.lastScanTime = blockscanner.getLastScanTime(b);
     return info;
   }
 
@@ -50,8 +51,6 @@
 
   long getLastScanTime() {return lastScanTime;}
 
-  long getGenerationStamp() {return generationStamp;}
-
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     super.write(out);
Index: src/java/org/apache/hadoop/dfs/BlocksMap.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlocksMap.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/BlocksMap.java	(working copy)
@@ -138,6 +138,23 @@
       return 0;
     }
 
+    /** Update this object */
+    void update(long newgenerationstamp, long newlength,
+        DatanodeDescriptor[] newtargets) {
+      //remove all nodes  
+      for(int n = numNodes(); n >= 0; ) {
+        removeNode(--n);
+      }
+
+      //add all targets  
+      for(DatanodeDescriptor d : newtargets) {
+        addNode(d);
+      }
+
+      generationStamp = newgenerationstamp;
+      len = newlength;
+    }
+
     /**
      * Add data-node this block belongs to.
      */
@@ -156,7 +173,11 @@
      * Remove data-node from the block.
      */
     boolean removeNode(DatanodeDescriptor node) {
-      int dnIndex = this.findDatanode(node);
+      return removeNode(findDatanode(node));
+    }
+
+    /** Remove the indexed datanode from the block. */
+    boolean removeNode(int dnIndex) {
       if(dnIndex < 0) // the node is not found
         return false;
       assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -96,9 +96,8 @@
   FSDirectory dir;
 
   //
-  // Stores the block-->datanode(s) map.  Updated only in response
-  // to client-sent information.
   // Mapping: Block -> { INode, datanodes, self ref } 
+  // Updated only in response to client-sent information.
   //
   BlocksMap blocksMap = new BlocksMap();
 
@@ -238,9 +237,10 @@
 
   /**
    * The global generation stamp for this file system. 
-   * Valid values start from 1000.
+   * Valid values start from FIRST_VALID_GENERATION_STAMP.
    */
-  private GenerationStamp generationStamp = new GenerationStamp(1000);
+  private final GenerationStamp generationStamp = new GenerationStamp(
+      FIRST_VALID_GENERATION_STAMP);
 
   // Ask Datanode only up to this many blocks to delete.
   private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
@@ -931,7 +931,7 @@
         // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
-        Lease lease = leaseManager.getLease(holder);
+        Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
         //
         // We found the lease for this file. And surprisingly the original
         // holder is trying to recreate this file. This should never occur.
@@ -945,7 +945,7 @@
         //
         // Find the original holder.
         //
-        lease = leaseManager.getLease(pendingFile.getClientName());
+        lease = leaseManager.getLease(pendingFile.clientName);
         if (lease == null) {
           throw new AlreadyBeingCreatedException(
                                                  "failed to create file " + src + " for " + holder +
@@ -958,8 +958,9 @@
         // to proceed. Otherwise, prevent this request from creating file.
         //
         if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: Removing lease " + lease);
-          leaseManager.removeExpiredLease(lease);
+          LOG.info("startFile: recover lease " + lease + ", src=" + src);
+          internalReleaseLease(lease, src);
+          leaseManager.renewLease(lease);
         } else {
           throw new AlreadyBeingCreatedException(
                                                  "failed to create file " + src + " for " + holder +
@@ -988,8 +989,6 @@
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      leaseManager.addLease(src, holder);
-
       //
       // Now we can add the name to the filesystem. This file has no
       // blocks associated with it.
@@ -997,10 +996,11 @@
       checkFsObjectLimit();
 
       // increment global generation stamp
-      long genstamp = generationStamp.nextStamp();
+      long genstamp = nextGenerationStamp();
+      INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
+          replication, blockSize, holder, clientMachine, clientNode, genstamp);
+      leaseManager.addLease(newNode.clientName, src);
 
-      INode newNode = dir.addFile(src, permissions,
-          replication, blockSize, holder, clientMachine, clientNode, genstamp);
       if (newNode == null) {
         throw new IOException("DIR* NameSystem.startFile: " +
                               "Unable to add file to namespace.");
@@ -1011,10 +1011,52 @@
       throw ie;
     }
 
-    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                   +"add "+src+" to namespace for "+holder);
+    }
   }
 
+  /** append is not yet ready.  This method is for testing. */
+  void appendFileInternal(String src, String holder, String clientMachine
+      ) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
+          +src+" for "+holder+" at "+clientMachine);
+    }
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot append file" + src, safeMode);
+    if (!isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    try {
+      INodeFile f = dir.getFileINode(src);
+      //assume f != null && !f.isUnderConstruction() && lease does not exist
+      //TODO: remove the assumption 
+
+      DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
+          clientMachine);
+      INodeFileUnderConstruction newnode = f.toINodeFileUnderConstruction(
+          holder, clientMachine, clientNode);
+
+      dir.replaceNode(src, f, newnode);
+      leaseManager.addLease(newnode.clientName, src);
+
+    } catch (IOException ie) {
+      NameNode.stateChangeLog.warn("DIR* NameSystem.appendFile: ", ie);
+      throw ie;
+    }
+
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: "
+          +"add "+src+" to namespace for "+holder);
+    }
+  }
+
   /**
    * The client would like to obtain an additional block for the indicated
    * filename (which is being written-to).  Return an array that consists
@@ -1079,6 +1121,7 @@
 
       // allocate new block record block locations in INode.
       newBlock = allocateBlock(src, pendingFile);
+      pendingFile.targets = targets;
     }
         
     // Create next block
@@ -1108,7 +1151,7 @@
       ) throws IOException {
     INode file = dir.getFileINode(src);
     if (file == null) {
-      Lease lease = leaseManager.getLease(holder);
+      Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
       throw new LeaseExpiredException("No lease on " + src +
                                       " File does not exist. " +
                                       (lease != null ? lease.toString() :
@@ -1116,7 +1159,7 @@
                                        " does not have any open files."));
     }
     if (!file.isUnderConstruction()) {
-      Lease lease = leaseManager.getLease(holder);
+      Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
       throw new LeaseExpiredException("No lease on " + src + 
                                       " File is not open for writing. " +
                                       (lease != null ? lease.toString() :
@@ -1167,30 +1210,14 @@
     } else if (!checkFileProgress(pendingFile, true)) {
       return STILL_WAITING;
     }
-        
-    // The file is no longer pending.
-    // Create permanent INode, update blockmap
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
 
-    // close file and persist block allocations for this file
-    dir.closeFile(src, newFile);
+    leaseManager.removeLease(pendingFile.clientName, src);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
 
-    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
                                   + " blocklist persisted");
-
-    leaseManager.removeLease(src, holder);
-
-    //
-    // REMIND - mjc - this should be done only after we wait a few secs.
-    // The namenode isn't giving datanodes enough time to report the
-    // replicated blocks that are automatically done as part of a client
-    // write.
-    //
-
-    // Now that the file is real, we need to be sure to replicate
-    // the blocks.
-    checkReplicationFactor(newFile);
+    }
     return COMPLETE_SUCCESS;
   }
 
@@ -1291,27 +1318,15 @@
    * dumps the contents of recentInvalidateSets
    */
   private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
-    Collection<Collection<Block>> values = recentInvalidateSets.values();
-    Iterator<Map.Entry<String,Collection<Block>>> it = 
-      recentInvalidateSets.entrySet().iterator();
-    if (values.size() == 0) {
-      out.println("Metasave: Blocks waiting deletion: 0");
+    int size = recentInvalidateSets.values().size();
+    out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
+    if (size == 0) {
       return;
     }
-    out.println("Metasave: Blocks waiting deletion from " +
-                values.size() + " datanodes.");
-    while (it.hasNext()) {
-      Map.Entry<String,Collection<Block>> entry = it.next();
-      String storageId = entry.getKey();
-      DatanodeDescriptor node = datanodeMap.get(storageId);
-      Collection<Block> blklist = entry.getValue();
-      if (blklist.size() > 0) {
-        out.print(node.getName());
-        for (Iterator jt = blklist.iterator(); jt.hasNext();) {
-          Block block = (Block) jt.next();
-          out.print(" " + block); 
-        }
-        out.println("");
+    for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
+      Collection<Block> blocks = entry.getValue();
+      if (blocks.size() > 0) {
+        out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
       }
     }
   }
@@ -1478,7 +1493,7 @@
     }
     if (old.isUnderConstruction()) {
       INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
-      leaseManager.removeLease(src, cons.getClientName());
+      leaseManager.removeLease(cons.clientName, src);
     }
     return true;
   }
@@ -1602,7 +1617,11 @@
    * @param src The filename
    * @param holder The datanode that was creating the file
    */
-  void internalReleaseCreate(String src, String holder) throws IOException {
+  void internalReleaseLease(Lease lease, String src) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("lease=" + lease + ", src=" + src);
+    }
+
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
@@ -1616,31 +1635,41 @@
                                    + src + " but file is already closed.");
       return;
     }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: "
+          + src + " does not being written in " + lease);
+    }
+
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
 
+    // Initialize lease recovery for pendingFile
+    pendingFile.assignPrimaryDatanode();
+  }
+
+  //TODO: remove the following method in HADOOP-3113
+  private void removeLastBlockWithZeroSize(INodeFileUnderConstruction cons
+      ) throws IOException {
     // The last block that was allocated migth not have been used by the
     // client. In this case, the size of the last block would be 0. A fsck
     // will report this block as a missing block because no datanodes have it.
     // Delete this block.
-    Block[] blocks = pendingFile.getBlocks();
+    Block[] blocks = cons.getBlocks();
     if (blocks != null && blocks.length > 0) {
       Block last = blocks[blocks.length - 1];
       if (last.getNumBytes() == 0) {
-          pendingFile.removeBlock(last);
+          cons.removeBlock(last);
           blocksMap.removeINode(last);
           for (Iterator<DatanodeDescriptor> it = 
                blocksMap.nodeIterator(last); it.hasNext();) {
             DatanodeDescriptor node = it.next();
             addToInvalidates(last, node);
           }
-          /* What else do we need to do?
-           * removeStoredBlock()? we do different things when a block is 
-           * removed in different contexts. Mostly these should be
-           * same and/or should be in one place.
-           */
       }
     }
-
+  }
+  
+  private void finalizeINodeFileUnderConstruction(String src,
+      INodeFileUnderConstruction pendingFile) throws IOException {
     // The file is no longer pending.
     // Create permanent INode, update blockmap
     INodeFile newFile = pendingFile.convertToInodeFile();
@@ -1649,14 +1678,58 @@
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
-    // replicate blocks of this file.
     checkReplicationFactor(newFile);
-  
-    NameNode.stateChangeLog.info("DIR* NameSystem.internalReleaseCreate: " + 
-                                 src + " is no longer written to by " + 
-                                 holder);
   }
 
+  synchronized void commitBlockSynchronization(Block lastblock,
+      long newgenerationstamp, long newlength, boolean closeFile,
+      DatanodeID[] newtargets
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("commitBlockSynchronization(lastblock=" + lastblock
+          + ", newgenerationstamp=" + newgenerationstamp
+          + ", newlength=" + newlength
+          + ", newtargets=" + Arrays.asList(newtargets) + ")");
+    }
+    BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
+    if (blockinfo == null) {
+      throw new IOException("Block (=" + lastblock + ") not found");
+    }
+    INodeFile iFile = blockinfo.getINode();
+    if (!iFile.isUnderConstruction()) {
+      throw new IOException("Unexpected block (=" + lastblock
+          + ") since the file (=" + iFile.getLocalName()
+          + ") is not under construction");
+    }
+
+    //update block info
+    if (newtargets.length > 0) {
+      DatanodeDescriptor[] descriptors = new DatanodeDescriptor[newtargets.length];
+      for(int i = 0; i < newtargets.length; i++) {
+        descriptors[i] = getDatanode(newtargets[i]);
+      }
+      blockinfo.update(newgenerationstamp, newlength, descriptors);
+    }
+
+    // If this commit does not want to close the file, just persist
+    // block locations and return
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+    if (!closeFile) {
+      dir.persistBlocks(leaseManager.findPath(pendingFile), pendingFile);
+      getEditLog().logSync();
+      return;
+    }
+    
+    //remove lease, close file
+    //TODO: remove the following line in HADOOP-3113
+    removeLastBlockWithZeroSize(pendingFile);
+
+    String src = leaseManager.removeLease(pendingFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
+    getEditLog().logSync();
+  }
+
+
   /**
    * Renew the lease(s) held by the given client
    */
@@ -1970,6 +2043,10 @@
         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
         updateStats(nodeinfo, true);
         
+        //check lease recovery
+        if (cmd == null) {
+          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        }
         //check pending replication
         if (cmd == null) {
           cmd = nodeinfo.getReplicationCommand(
@@ -4187,6 +4264,15 @@
     return generationStamp.getStamp();
   }
 
+  /**
+   * Increments, logs and then returns the stamp
+   */
+  long nextGenerationStamp() {
+    long gs = generationStamp.nextStamp();
+    getEditLog().logGenerationStamp(gs);
+    return gs;
+  }
+
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //
Index: src/java/org/apache/hadoop/dfs/FSEditLog.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSEditLog.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSEditLog.java	(working copy)
@@ -527,7 +527,7 @@
                                         clientMachine, 
                                         null);
               fsDir.replaceNode(path, node, cons);
-              fsNamesys.leaseManager.addLease(path, clientName);
+              fsNamesys.leaseManager.addLease(cons.clientName, path);
             } else if (opcode == OP_CLOSE) {
               //
               // Remove lease if it exists.
@@ -535,7 +535,7 @@
               if (old.isUnderConstruction()) {
                 INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
                                                      old;
-                fsNamesys.leaseManager.removeLease(path, cons.getClientName());
+                fsNamesys.leaseManager.removeLease(cons.clientName, path);
               }
             }
             break;
@@ -573,7 +573,7 @@
             old = fsDir.unprotectedDelete(path, timestamp, null);
             if (old != null && old.isUnderConstruction()) {
               INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
-              fsNamesys.leaseManager.removeLease(path, cons.getClientName());
+              fsNamesys.leaseManager.removeLease(cons.clientName, path);
             }
             break;
           }
Index: src/java/org/apache/hadoop/dfs/LeaseManager.java
===================================================================
--- src/java/org/apache/hadoop/dfs/LeaseManager.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/LeaseManager.java	(working copy)
@@ -21,11 +21,35 @@
 import java.util.*;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * LeaseManager does the lease housekeeping for writing on files.   
+ * This class also provides useful static methods for lease recovery.
+ * 
+ * Lease Recovery Algorithm
+ * 1) Namenode retrieves lease information
+ * 2) For each file f in the lease, consider the last block b of f
+ * 2.1) Get the datanodes which contains b
+ * 2.2) Assign one of the datanodes as the primary datanode p
+
+ * 2.3) p obtains a new generation stamp form the namenode
+ * 2.4) p get the block info from each datanode
+ * 2.5) p computes the minimum block length
+ * 2.6) p updates the datanodes, which have a valid generation stamp,
+ *      with the new generation stamp and the minimum block length 
+ * 2.7) p acknowledges the namenode the update results
+
+ * 2.8) Namenode updates the BlockInfo
+ * 2.9) Namenode removes f from the lease
+ *      and removes the lease once all files have been removed
+ * 2.10) Namenode commit changes to edit log
+ */
 class LeaseManager {
-  static final Log LOG = FSNamesystem.LOG;
+  static final Log LOG = LogFactory.getLog(LeaseManager.class);
 
   private final FSNamesystem fsnamesystem;
 
@@ -48,8 +72,8 @@
 
   LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
 
-  Lease getLease(String holder) throws IOException {
-    return leases.get(new StringBytesWritable(holder));
+  Lease getLease(StringBytesWritable holder) throws IOException {
+    return leases.get(holder);
   }
   
   SortedSet<Lease> getSortedLeases() {return sortedLeases;}
@@ -72,42 +96,85 @@
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String src, String holder) throws IOException {
+  synchronized void addLease(StringBytesWritable holder, String src
+      ) throws IOException {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
-      leases.put(new StringBytesWritable(holder), lease);
+      leases.put(holder, lease);
       sortedLeases.add(lease);
     } else {
-      sortedLeases.remove(lease);
-      lease.renew();
-      sortedLeases.add(lease);
+      renewLease(lease);
     }
     sortedLeasesByPath.put(src, lease);
-    lease.startedCreate(src);
+    lease.paths.add(new StringBytesWritable(src));
   }
 
   /**
-   * deletes the lease for the specified file
+   * Remove the specified lease and src.
    */
-  synchronized void removeLease(String src, String holder) throws IOException {
-    Lease lease = getLease(holder);
+  synchronized void removeLease(Lease lease, String src) throws IOException {
+    sortedLeasesByPath.remove(src);
+    if (!lease.paths.remove(new StringBytesWritable(src))) {
+      LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
+    }
+
+    if (!lease.hasPath()) {
+      leases.remove(lease.holder);
+      if (!sortedLeases.remove(lease)) {
+        LOG.error(lease + " not found in sortedLeases");
+      }
+    }
+  }
+
+  /**
+   * Remove the lease for the specified holder and src
+   */
+  synchronized void removeLease(StringBytesWritable holder, String src
+      ) throws IOException {
+    removeLease(getLease(holder), src);
+  }
+
+  /**
+   * Remove the lease for the specified pendingFile
+   */
+  synchronized String removeLease(INodeFileUnderConstruction pendingFile
+      ) throws IOException {
+    Lease lease = getLease(pendingFile.clientName);
     if (lease != null) {
-      lease.completedCreate(src);
-      sortedLeasesByPath.remove(src);
+      String src = lease.findPath(pendingFile);
+      if (src != null) {
+        removeLease(lease, src);
+        return src;
+      }
+    }
+    throw new IOException("pendingFile (=" + pendingFile + ") not found."
+        + "(lease=" + lease + ")");
+  }
 
-      if (!lease.hasPath()) {
-        leases.remove(new StringBytesWritable(holder));
-        sortedLeases.remove(lease);
+  /**
+   * Finds the pathname for the specified pendingFile
+   */
+  synchronized String findPath(INodeFileUnderConstruction pendingFile
+      ) throws IOException {
+    Lease lease = getLease(pendingFile.clientName);
+    if (lease != null) {
+      String src = lease.findPath(pendingFile);
+      if (src != null) {
+        return src;
       }
     }
+    throw new IOException("pendingFile (=" + pendingFile + ") not found."
+        + "(lease=" + lease + ")");
   }
 
   /**
    * Renew the lease(s) held by the given client
    */
   synchronized void renewLease(String holder) throws IOException {
-    Lease lease = getLease(holder);
+    renewLease(getLease(new StringBytesWritable(holder)));
+  }
+  synchronized void renewLease(Lease lease) {
     if (lease != null) {
       sortedLeases.remove(lease);
       lease.renew();
@@ -115,21 +182,6 @@
     }
   }
 
-  synchronized void removeExpiredLease(Lease lease) throws IOException {
-    String holder = lease.holder.getString();
-    for(StringBytesWritable sbw : lease.paths) {
-      String p = sbw.getString();
-      fsnamesystem.internalReleaseCreate(p, holder);
-      sortedLeasesByPath.remove(p);
-    }
-    lease.paths.clear();
-    
-    leases.remove(lease.holder);
-    if (!sortedLeases.remove(lease)) {
-      LOG.error("removeExpiredLease: " + lease + " not found in sortedLeases");
-    }
-  }
-
   /************************************************************
    * A Lease governs all the locks held by a single client.
    * For each client there's a corresponding lease, whose
@@ -142,11 +194,13 @@
     private long lastUpdate;
     private Collection<StringBytesWritable> paths = new TreeSet<StringBytesWritable>();
   
-    public Lease(String holder) throws IOException {
-      this.holder = new StringBytesWritable(holder);
+    /** Only LeaseManager object can create a lease */
+    private Lease(StringBytesWritable holder) throws IOException {
+      this.holder = holder;
       renew();
     }
-    public void renew() {
+    /** Only LeaseManager object can renew a lease */
+    private void renew() {
       this.lastUpdate = FSNamesystem.now();
     }
 
@@ -160,14 +214,20 @@
       return FSNamesystem.now() - lastUpdate > softLimit;
     }
 
-    void startedCreate(String src) throws IOException {
-      paths.add(new StringBytesWritable(src));
+    /**
+     * @return the path associated with the pendingFile and null if not found.
+     */
+    private String findPath(INodeFileUnderConstruction pendingFile) {
+      for(Iterator<StringBytesWritable> i = paths.iterator(); i.hasNext(); ) {
+        String src = i.next().toString();
+        if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
+          return src;
+        }
+      }
+      return null;
     }
 
-    boolean completedCreate(String src) throws IOException {
-      return paths.remove(new StringBytesWritable(src));
-    }
-
+    /** Does this lease contain any path? */
     boolean hasPath() {return !paths.isEmpty();}
 
     /** {@inheritDoc} */
@@ -309,8 +369,11 @@
                      ((top = sortedLeases.first()) != null)) {
                 if (top.expiredHardLimit()) {
                   LOG.info("Lease Monitor: Removing lease " + top
-                      + ", leases remaining: " + sortedLeases.size());
-                  removeExpiredLease(top);
+                      + ", sortedLeases.size()=: " + sortedLeases.size());
+                  for(StringBytesWritable s : top.paths) {
+                    fsnamesystem.internalReleaseLease(top, s.getString());
+                  }
+                  renewLease(top);
                 } else {
                   break;
                 }
@@ -330,4 +393,98 @@
       }
     }
   }
+
+  /*
+   * The following codes provides useful static methods for lease recovery.
+   */
+  /** A convenient class used in lease recovery */
+  private static class BlockRecord { 
+    final DatanodeID id;
+    final InterDatanodeProtocol datanode;
+    final Block block;
+    
+    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+      this.id = id;
+      this.datanode = datanode;
+      this.block = block;
+    }
+  }
+
+  /**
+   * Recover a list of blocks.
+   * This method is invoked by the primary datanode.
+   */
+  static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
+      DatanodeProtocol namenode, Configuration conf) {
+    for(int i = 0; i < blocks.length; i++) {
+      try {
+        recoverBlock(blocks[i], targets[i], namenode, conf, true);
+      } catch (IOException e) {
+        LOG.warn("recoverBlocks, i=" + i, e);
+      }
+    }
+  }
+
+  /** Recover a block */
+  static boolean recoverBlock(Block block, DatanodeID[] datanodeids,
+      DatanodeProtocol namenode, Configuration conf,
+      boolean closeFile) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("block=" + block
+          + ", datanodeids=" + Arrays.asList(datanodeids));
+    }
+    List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+    long minlength = Long.MAX_VALUE;
+
+    //check generation stamps
+    for(DatanodeID id : datanodeids) {
+      try {
+        InterDatanodeProtocol datanode
+            = DataNode.createInterDataNodeProtocolProxy(id, conf);
+        BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+        if (info.getGenerationStamp() >= block.generationStamp) {
+          syncList.add(new BlockRecord(id, datanode, new Block(info)));
+          if (info.len < minlength) {
+            minlength = info.len;
+          }
+        }
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn(
+            "Failed to getBlockMetaDataInfo for block (=" + block 
+            + ") from datanode (=" + id + ")", e);
+      }
+    }
+    
+    return syncBlock(block, minlength, syncList, namenode, closeFile);
+  }
+
+  /** Block synchronization */
+  private static boolean syncBlock(Block block, long minlength,
+      List<BlockRecord> syncList, DatanodeProtocol namenode,
+      boolean closeFile
+      ) throws IOException {
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+    long generationstamp = namenode.nextGenerationStamp();
+    Block newblock = new Block(block.blkid, minlength, generationstamp);
+
+    for(BlockRecord r : syncList) {
+      try {
+        r.datanode.updateBlock(r.block, newblock);
+        successList.add(r.id);
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+            + newblock + ", datanode=" + r.id + ")", e);
+      }
+    }
+
+    //TODO: remove "true ||" in HADOOP-3113
+    if (true || !successList.isEmpty()) {
+      namenode.commitBlockSynchronization(block,
+          newblock.generationStamp, newblock.len, closeFile,
+          successList.toArray(new DatanodeID[successList.size()]));
+      return true; // success
+    }
+    return false; // failed
+  }
 }
Index: src/java/org/apache/hadoop/dfs/BlockCommand.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlockCommand.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/BlockCommand.java	(working copy)
@@ -18,6 +18,9 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import java.util.List;
+
+import org.apache.hadoop.dfs.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.io.*;
 
 abstract class DatanodeCommand implements Writable {
@@ -101,12 +104,17 @@
   /**
    * Create BlockCommand for transferring blocks to another datanode
    * @param blocks    blocks to be transferred 
-   * @param targets   nodes to transfer
    */
-  BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-    super( DatanodeProtocol.DNA_TRANSFER);
-    this.blocks = blocks;
-    this.targets = targets;
+  BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
+    super(action);
+
+    blocks = new Block[blocktargetlist.size()]; 
+    targets = new DatanodeInfo[blocks.length][];
+    for(int i = 0; i < blocks.length; i++) {
+      BlockTargetPair p = blocktargetlist.get(i);
+      blocks[i] = p.block;
+      targets[i] = p.targets;
+    }
   }
 
   private static final DatanodeInfo[][] EMPTY_TARGET = {};
Index: src/java/org/apache/hadoop/dfs/INode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/INode.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/INode.java	(working copy)
@@ -713,27 +713,6 @@
   }
 
   /**
-   * remove a block from the block list. This block should be
-   * the last one on the list.
-   */
-  void removeBlock(Block oldblock) throws IOException {
-    if (this.blocks == null) {
-      throw new IOException("Trying to delete non-existant block " +
-                            oldblock);
-    }
-    int size = this.blocks.length;
-    if (!this.blocks[size-1].equals(oldblock)) {
-      throw new IOException("Trying to delete non-existant block " +
-                            oldblock);
-    }
-    BlockInfo[] newlist = new BlockInfo[size - 1];
-    for (int i = 0; i < size-1; i++) {
-        newlist[i] = this.blocks[i];
-    }
-    this.blocks = newlist;
-  }
-
-  /**
    * Set file block
    */
   void setBlock(int idx, BlockInfo blk) {
@@ -777,19 +756,31 @@
     }
     return blocks[blocks.length - 2];
   }
+
+  INodeFileUnderConstruction toINodeFileUnderConstruction(
+      String clientName, String clientMachine, DatanodeDescriptor clientNode
+      ) throws IOException {
+    if (isUnderConstruction()) {
+      return (INodeFileUnderConstruction)this;
+    }
+    return new INodeFileUnderConstruction(name,
+        blockReplication, modificationTime, preferredBlockSize,
+        blocks, getPermissionStatus(),
+        clientName, clientMachine, clientNode);
+  }
 }
 
 class INodeFileUnderConstruction extends INodeFile {
-  protected StringBytesWritable clientName;         // lease holder
-  protected StringBytesWritable clientMachine;
-  protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+  StringBytesWritable clientName = null;         // lease holder
+  StringBytesWritable clientMachine = null;
+  DatanodeDescriptor clientNode = null; // if client is a cluster node too.
 
-  INodeFileUnderConstruction() {
-    clientName = null;
-    clientMachine = null;
-    clientNode = null;
-  }
+  private int primaryNodeIndex = -1; //the node working on lease recovery
+  DatanodeDescriptor[] targets = null;   //locations for last block
+  
+  INodeFileUnderConstruction() {}
 
+
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
                              long preferredBlockSize,
@@ -855,4 +846,49 @@
     return obj;
     
   }
+
+  /**
+   * remove a block from the block list. This block should be
+   * the last one on the list.
+   */
+  void removeBlock(Block oldblock) throws IOException {
+    if (blocks == null) {
+      throw new IOException("Trying to delete non-existant block " + oldblock);
+    }
+    int size_1 = blocks.length - 1;
+    if (!blocks[size_1].equals(oldblock)) {
+      throw new IOException("Trying to delete non-last block " + oldblock);
+    }
+
+    //copy to a new list
+    BlockInfo[] newlist = new BlockInfo[size_1];
+    System.arraycopy(blocks, 0, newlist, 0, size_1);
+    blocks = newlist;
+    
+    // Remove the block locations for the last block.
+    targets = null;
+  }
+
+  /**
+   * Initialize lease recovery for this object
+   * @return the chosen primary datanode
+   */
+  void assignPrimaryDatanode() {
+    //assign the first alive datanode as the primary datanode
+    if (targets.length == 0) {
+      NameNode.stateChangeLog.warn("BLOCK*"
+          + " INodeFileUnderConstruction.initLeaseRecovery:"
+          + " all targets are not alive.");
+    }
+
+    int previous = primaryNodeIndex;
+    //find an alive datanode beginning from previous
+    for(int i = 1; i <= targets.length; i++) {
+      int j = (previous + i)%targets.length;
+      if (targets[j].isAlive) {
+        DatanodeDescriptor primary = targets[primaryNodeIndex = j]; 
+        primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
+      }
+    }
+  }
 }
Index: src/java/org/apache/hadoop/dfs/ClientProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/ClientProtocol.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/ClientProtocol.java	(working copy)
@@ -37,9 +37,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 34 : remove abandonFileInProgress(...)
+   * 35 : added append(...)
    */
-  public static final long versionID = 34L;
+  public static final long versionID = 35L;
   
   ///////////////////////////////////////
   // File contents
@@ -104,6 +104,14 @@
                              ) throws IOException;
 
   /**
+   * Request an append operation on src.
+   * 
+   * WARNING: this method is not yet implemented completely in the NameNode.
+   * It is only for testing at this moment.  
+   */
+  public void append(String src, String clientName) throws IOException;
+
+  /**
    * Set replication for an existing file.
    * <p>
    * The NameNode sets replication to the new value and returns.
Index: src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(working copy)
@@ -30,17 +30,15 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 1: added getBlockMetaDataInfo and updateGenerationStamp
+   * 2: change updateGenerationStamp to updataBlock
    */
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;
 
   /** @return the BlockMetaDataInfo of a block */
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
-   * Update the GenerationStamp of a block
-   * @return true iff update was required and done successfully 
+   * Update the block to the new generation stamp and length.  
    */
-  boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
-      ) throws IOException;
+  void updateBlock(Block oldblock, Block newblock) throws IOException;
 }
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/FSDataset.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDataset.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSDataset.java	(working copy)
@@ -553,11 +553,51 @@
                      "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
   }
   protected File getMetaFile(Block b) throws IOException {
-    File blockFile = getBlockFile( b );
-    return new File( blockFile.getAbsolutePath() + 
-                     "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
+    return getMetaFile(getBlockFile(b), b);
   }
 
+  /** Find the corresponding meta data file from a given block file */
+  static File findMetaFile(final File blockFile) throws IOException {
+    final String prefix = blockFile.getName() + "_";
+    final File parent = blockFile.getParentFile();
+    File[] matches = parent.listFiles(new FilenameFilter() {
+      public boolean accept(File dir, String name) {
+        return dir.equals(parent)
+            && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
+      }
+    });
+
+    if (matches == null || matches.length == 0) {
+      throw new IOException("Meta file not found, blockFile=" + blockFile);
+    }
+    else if (matches.length > 1) {
+      throw new IOException("Found more than one meta files: " 
+          + Arrays.asList(matches));
+    }
+    return matches[0];
+  }
+  
+  /** Find the corresponding meta data file from a given block file */
+  static long parseGenerationStamp(File blockFile, File metaFile
+      ) throws IOException {
+    String metaname = metaFile.getName();
+    String gs = metaname.substring(blockFile.getName().length() + 1,
+        metaname.length() - METADATA_EXTENSION.length());
+    try {
+      return Long.parseLong(gs);
+    } catch(NumberFormatException nfe) {
+      throw (IOException)new IOException("blockFile=" + blockFile
+          + ", metaFile=" + metaFile).initCause(nfe);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public long getStoredGenerationStamp(Block b) throws IOException {
+    File blockFile = getBlockFile(b);
+    File metaFile = findMetaFile(blockFile);
+    return parseGenerationStamp(blockFile, metaFile);
+  }
+
   public boolean metaFileExists(Block b) throws IOException {
     return getMetaFile(b).exists();
   }
@@ -624,11 +664,7 @@
    * Find the block's on-disk length
    */
   public long getLength(Block b) throws IOException {
-    File f = validateBlockFile(b);
-    if(f == null) {
-      throw new IOException("Block " + b + " is not valid.");
-    }
-    return f.length();
+    return getBlockFile(b).length();
   }
 
   /**
@@ -637,6 +673,9 @@
   protected synchronized File getBlockFile(Block b) throws IOException {
     File f = validateBlockFile(b);
     if(f == null) {
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+      }
       throw new IOException("Block " + b + " is not valid.");
     }
     return f;
@@ -680,6 +719,130 @@
     return info.detachBlock(block, numLinks);
   }
 
+  private synchronized void updateVolumeMap(Block oldblock, Block newblock
+      ) throws IOException {
+    if (!volumeMap.containsKey(oldblock)) {
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("volumeMap=" + volumeMap);
+      }
+      throw new IOException("oldblock (=" + oldblock
+          + ") not found in the volumeMap");
+    }
+
+    DatanodeBlockInfo info = volumeMap.remove(oldblock);
+    volumeMap.put(newblock, info);
+  }
+
+  /** interrupt and wait for all ongoing create threads */
+  synchronized void interruptOngoingCreates(Block b) {
+    //remove ongoingCreates threads
+    ActiveFile activefile = ongoingCreates.remove(b);
+    if (activefile != null) {
+      for(Thread t : activefile.threads) {
+        t.interrupt();
+      }
+      for(Thread t : activefile.threads) {
+        try {
+          t.join();
+        } catch (InterruptedException e) {
+          DataNode.LOG.warn("interruptOngoingCreates: b=" + b
+              + ", activeFile=" + activefile + ", t=" + t, e);
+        }
+      }
+      activefile.threads.clear();
+    }
+  }
+  /** {@inheritDoc} */
+  public synchronized void updateBlock(Block oldblock, Block newblock
+      ) throws IOException {
+    if (oldblock.getBlockId() != newblock.getBlockId()) {
+      throw new IOException("Cannot update oldblock (=" + oldblock
+          + ") to newblock (=" + newblock + ").");
+    }
+
+    //update oldblock.generationStamp
+    //since the stamp stored in this datanode could be changed.
+    oldblock.generationStamp = getStoredGenerationStamp(oldblock);
+
+    interruptOngoingCreates(oldblock);
+    
+    File blockFile = getBlockFile(oldblock);
+    long oldlen = blockFile.length();
+    File oldMetaFile = getMetaFile(blockFile, oldblock);
+    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+    
+    //rename meta file to a tmp file
+    File tmpMetaFile = new File(oldMetaFile.getParent(),
+        oldMetaFile.getName()+"_tmp");
+    oldMetaFile.renameTo(tmpMetaFile);
+
+    //update generation stamp
+    if (oldgs > newblock.generationStamp) {
+      throw new IOException("Cannot update block (id=" + newblock.blkid
+          + ") generation stamp from " + oldgs + " to " + newblock.generationStamp);
+    }
+    
+    //update length
+    if (newblock.len > oldlen) {
+      throw new IOException("Cannot update block file (=" + blockFile
+          + ") length from " + oldlen + " to " + newblock.len);
+    }
+    if (newblock.len < oldlen) {
+      truncateBlock(blockFile, tmpMetaFile, oldlen, newblock.len);
+    }
+
+    //rename the tmp file to the new meta file (with new generation stamp) 
+    tmpMetaFile.renameTo(getMetaFile(blockFile, newblock));
+
+    updateVolumeMap(oldblock, newblock);
+  }
+
+  static private void truncateBlock(File blockFile, File metaFile,
+      long oldlen, long newlen) throws IOException {
+    if (newlen == oldlen) {
+      return;
+    }
+    if (newlen > oldlen) {
+      throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
+          + ") to newlen (=" + newlen + ")");
+    }
+
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
+    int checksumsize = dcs.getChecksumSize();
+    int bpc = dcs.getBytesPerChecksum();
+    long n = (newlen - 1)/bpc + 1;
+    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+    long lastchunkoffset = (n - 1)*bpc;
+    int lastchunksize = (int)(newlen - lastchunkoffset); 
+    byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 
+
+    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+    try {
+      //truncate blockFile 
+      blockRAF.setLength(newlen);
+ 
+      //read last chunk
+      blockRAF.seek(lastchunkoffset);
+      blockRAF.readFully(b, 0, lastchunksize);
+    } finally {
+      blockRAF.close();
+    }
+
+    //compute checksum
+    dcs.update(b, 0, lastchunksize);
+    dcs.writeValue(b, 0, false);
+
+    //update metaFile 
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    try {
+      metaRAF.setLength(newmetalen);
+      metaRAF.seek(newmetalen - checksumsize);
+      metaRAF.write(b, 0, checksumsize);
+    } finally {
+      metaRAF.close();
+    }
+  }
+
   /**
    * Start writing to a block file
    * If isRecovery is true and the block pre-exists, then we kill all
@@ -880,6 +1043,9 @@
     File f = getFile(b);
     if(f != null && f.exists())
       return f;
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("b=" + b + ", f=" + f);
+    }
     return null;
   }
 
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -19,7 +19,6 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -80,7 +79,8 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
+public class DataNode extends Configured 
+    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
@@ -178,7 +178,7 @@
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
-      
+    super(conf);
     datanodeObject = this;
 
     try {
@@ -772,12 +772,11 @@
   private boolean processCommand(DatanodeCommand cmd) throws IOException {
     if (cmd == null)
       return true;
+    final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
-      //
       // Send a copy of a block to another datanode
-      //
-      BlockCommand bcmd = (BlockCommand)cmd;
       transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
@@ -785,7 +784,7 @@
       // Some local block(s) are obsolete and can be 
       // safely garbage-collected.
       //
-      Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+      Block toDelete[] = bcmd.getBlocks();
       try {
         if (blockScanner != null) {
           blockScanner.deleteBlocks(toDelete);
@@ -821,6 +820,9 @@
         scheduleBlockReport(initialBlockReportDelay);
       }
       break;
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+      recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -3049,13 +3051,22 @@
     return BlockMetaDataInfo.getBlockMetaDataInfo(block, data, blockScanner);
   }
 
+  Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+    Daemon d = new Daemon(threadGroup, new Runnable() {
+      public void run() {
+        LeaseManager.recoverBlocks(blocks, targets, namenode, getConf());
+      }
+    });
+    d.start();
+    return d;
+  }
+
   /** {@inheritDoc} */
-  public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+      LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
     }
-    //TODO: update generation stamp here
-    return false;
+    data.updateBlock(oldblock, newblock);
   }
 
   /** {@inheritDoc} */
@@ -3063,8 +3074,20 @@
       ) throws IOException {
     if (protocol.equals(InterDatanodeProtocol.class.getName())) {
       return InterDatanodeProtocol.versionID; 
-    } else {
-      throw new IOException("Unknown protocol to name node: " + protocol);
+    } else if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
+      return ClientDatanodeProtocol.versionID; 
     }
+    throw new IOException("Unknown protocol to " + getClass().getSimpleName()
+        + ": " + protocol);
   }
+
+  // ClientDataNodeProtocol implementation
+  /** {@inheritDoc} */
+  public boolean recoverBlock(Block block, DatanodeInfo[] targets) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("recoverBlock for " + block);
+    }
+    return LeaseManager.recoverBlock(block, targets, namenode, 
+                                     getConf(), false);
+  }
 }
Index: src/java/org/apache/hadoop/dfs/GenerationStamp.java
===================================================================
--- src/java/org/apache/hadoop/dfs/GenerationStamp.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/GenerationStamp.java	(working copy)
@@ -64,9 +64,9 @@
   }
 
   /**
-   * Returns the current stamp and incrments the counter
+   * First increments the counter and then returns the stamp 
    */
-  public long nextStamp() {
+  public synchronized long nextStamp() {
     this.genstamp++;
     return this.genstamp;
   }
Index: src/java/org/apache/hadoop/dfs/FSConstants.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSConstants.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSConstants.java	(working copy)
@@ -131,7 +131,9 @@
   // Currently we set the maximum length to 8k characters and the maximum depth to 1k.  
   public static int MAX_PATH_LENGTH = 8000;
   public static int MAX_PATH_DEPTH = 1000;
-    
+
+  public static long FIRST_VALID_GENERATION_STAMP = 1000L;
+  
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
   //Used for writing header etc.
   static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
Index: src/java/org/apache/hadoop/dfs/StringBytesWritable.java
===================================================================
--- src/java/org/apache/hadoop/dfs/StringBytesWritable.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/StringBytesWritable.java	(working copy)
@@ -46,6 +46,15 @@
     return new String(get(),"UTF8");
   }
 
+  /** {@inheritDoc} */
+  public String toString() {
+    try {
+      return getString();
+    } catch (IOException e) {
+      throw (RuntimeException)new RuntimeException().initCause(e);
+    }
+  }
+
   /**
    * Compare to a String.
    */
Index: src/java/org/apache/hadoop/dfs/FSImage.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSImage.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSImage.java	(working copy)
@@ -1031,7 +1031,7 @@
       }
       INodeFile oldnode = (INodeFile) old;
       fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(path, cons.getClientName()); 
+      fs.leaseManager.addLease(cons.clientName, path); 
     }
   }
 
Index: src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(working copy)
@@ -18,12 +18,10 @@
 package org.apache.hadoop.dfs;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
 
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
-import org.apache.hadoop.dfs.DatanodeInfo.AdminStates;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.WritableUtils;
@@ -40,28 +38,63 @@
 
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
+  /** Block and targets pair */
+  static class BlockTargetPair {
+    final Block block;
+    final DatanodeDescriptor[] targets;    
 
+    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+      this.block = block;
+      this.targets = targets;
+    }
+  }
+
+  /** A BlockTargetPair queue. */
+  private static class BlockQueue {
+    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+
+    /** Size of the queue */
+    synchronized int size() {return blockq.size();}
+
+    /** Enqueue */
+    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
+      return blockq.offer(new BlockTargetPair(block, targets));
+    }
+
+    /** Dequeue */
+    synchronized List<BlockTargetPair> poll(int numTargets) {
+      if (numTargets <= 0 || blockq.isEmpty()) {
+        return null;
+      }
+      else {
+        List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+        for(; !blockq.isEmpty() && numTargets > 0; ) {
+          numTargets -= blockq.peek().targets.length; 
+          if (numTargets >= 0) {
+            results.add(blockq.poll());
+          }
+        }
+        return results;
+      }
+    }
+  }
+
   private volatile BlockInfo blockList = null;
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
 
-  //
-  // List of blocks to be replicated by this datanode
-  // Also, a list of datanodes per block to indicate the target
-  // datanode of this replication.
-  //
-  List<Block> replicateBlocks;
-  List<DatanodeDescriptor[]> replicateTargetSets;
-  Set<Block> invalidateBlocks;
+  /** A queue of blocks to be replicated by this datanode */
+  private BlockQueue replicateBlocks = new BlockQueue();
+  /** A queue of blocks to be recovered by this datanode */
+  private BlockQueue recoverBlocks = new BlockQueue();
+  /** A set of blocks to be invalidated by this datanode */
+  private Set<Block> invalidateBlocks = new TreeSet<Block>();
+
   boolean processedBlockReport = false;
-
   
   /** Default constructor */
-  public DatanodeDescriptor() {
-    super();
-    initWorkLists();
-  }
+  public DatanodeDescriptor() {}
   
   /** DatanodeDescriptor constructor
    * @param nodeID id of the data node
@@ -107,7 +140,6 @@
                             int xceiverCount) {
     super(nodeID);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
   /** DatanodeDescriptor constructor
@@ -128,18 +160,8 @@
                             int xceiverCount) {
     super(nodeID, networkLocation, hostName);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
-  /*
-   * initialize list of blocks that store work for the datanodes
-   */
-  private void initWorkLists() {
-    replicateBlocks = new ArrayList<Block>();
-    replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
-    invalidateBlocks = new TreeSet<Block>();
-  }
-
   /**
    * Add data-node to the block.
    * Add block to the head of the list of blocks belonging to the data-node.
@@ -227,13 +249,18 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    synchronized (replicateBlocks) {
-      replicateBlocks.add(block);
-      replicateTargetSets.add(targets);
-    }
+    replicateBlocks.offer(block, targets);
   }
 
   /**
+   * Store block recovery work.
+   */
+  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
+    assert(block != null && targets != null && targets.length > 0);
+    recoverBlocks.offer(block, targets);
+  }
+
+  /**
    * Store block invalidation work.
    */
   void addBlocksToBeInvalidated(List<Block> blocklist) {
@@ -245,16 +272,14 @@
     }
   }
 
-  /*
+  /**
    * The number of work items that are pending to be replicated
    */
   int getNumberOfBlocksToBeReplicated() {
-    synchronized (replicateBlocks) {
-      return replicateBlocks.size();
-    }
+    return replicateBlocks.size();
   }
 
-  /*
+  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -279,36 +304,16 @@
     return processedBlockReport;
   }
 
-  /**
-   * Remove the specified number of target sets
-   */
-  BlockCommand getReplicationCommand(int maxNumTransfers) {
-    synchronized (replicateBlocks) {
-      assert(replicateBlocks.size() == replicateTargetSets.size());
+  BlockCommand getReplicationCommand(int maxTransfers) {
+    List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
+    return blocktargetlist == null? null:
+        new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
+  }
 
-      if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
-        return null;
-      }
-      int numTransfers = 0;
-      int numBlocks = 0;
-      int i;
-      for (i = 0; i < replicateTargetSets.size() && 
-             numTransfers < maxNumTransfers; i++) {
-        numTransfers += replicateTargetSets.get(i).length;
-      }
-      numBlocks = i;
-      Block[] blocklist = new Block[numBlocks];
-      DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][];
-
-      for (i = 0; i < numBlocks; i++) {
-        blocklist[i] = replicateBlocks.get(0);
-        targets[i] = replicateTargetSets.get(0);
-        replicateBlocks.remove(0);
-        replicateTargetSets.remove(0);
-      }
-      assert(blocklist.length > 0 && targets.length > 0);
-      return new BlockCommand(blocklist, targets);
-    }
+  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
+    return blocktargetlist == null? null:
+        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
   }
 
   /**
Index: src/java/org/apache/hadoop/dfs/Block.java
===================================================================
--- src/java/org/apache/hadoop/dfs/Block.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/Block.java	(working copy)
@@ -51,6 +51,10 @@
     }
   }
 
+  static long filename2id(String name) {
+    return Long.parseLong(name.substring("blk_".length()));
+  }
+
   long blkid;
   long len;
   long generationStamp;
@@ -83,11 +87,7 @@
    * Find the blockid from the given filename
    */
   public Block(File f, long len, long genstamp) {
-    String name = f.getName();
-    name = name.substring("blk_".length());
-    this.blkid = Long.parseLong(name);
-    this.len = len;
-    this.generationStamp = genstamp;
+    this(filename2id(f.getName()), len, genstamp);
   }
 
   public void set(long blkid, long len, long genStamp) {
Index: src/java/org/apache/hadoop/dfs/FSDirectory.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDirectory.java	(revision 659757)
+++ src/java/org/apache/hadoop/dfs/FSDirectory.java	(working copy)
@@ -127,7 +127,7 @@
   /**
    * Add the given filename to the fs.
    */
-  INode addFile(String path, 
+  INodeFileUnderConstruction addFile(String path, 
                 PermissionStatus permissions,
                 short replication,
                 long preferredBlockSize,
@@ -166,7 +166,6 @@
     }
     // add create file record to log, record new generation stamp
     fsImage.getEditLog().logOpenFile(path, newNode);
-    fsImage.getEditLog().logGenerationStamp(generationStamp);
 
     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                   +path+" is added to the file system");
@@ -287,13 +286,14 @@
    */
   void closeFile(String path, INodeFile file) throws IOException {
     waitForReady();
-
     synchronized (rootDir) {
       // file is closed
       fsImage.getEditLog().logCloseFile(path, file);
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
                                     +path+" with "+ file.getBlocks().length 
                                     +" blocks is persisted to the file system");
+      }
     }
   }
 
