Index: src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(revision 657965)
+++ src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(working copy)
@@ -21,12 +21,21 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 
 /**
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol extends junit.framework.TestCase {
+  static void checkMetaInfo(Block b, InterDatanodeProtocol idp,
+      DataBlockScanner scanner) throws IOException {
+    BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
+    assertEquals(b.getBlockId(), metainfo.getBlockId());
+    assertEquals(b.getNumBytes(), metainfo.getNumBytes());
+    assertEquals(scanner.getLastScanTime(b),
+        metainfo.getLastScanTime());
+  }
+  
   public void testGetBlockMetaDataInfo() throws IOException {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;
@@ -37,14 +46,20 @@
 
       //create a file
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-      String filepath = "/foo";
-      DFSTestUtil.createFile(dfs, new Path(filepath), 1024L, (short)3, 0L);
-      assertTrue(dfs.dfs.exists(filepath));
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+      assertTrue(dfs.dfs.exists(filestr));
 
+      //copy to a backup 
+      Path backuppath = new Path("/backup");
+      FileUtil.copy(dfs, filepath, dfs, backuppath, false, conf);
+      assertTrue(dfs.exists(backuppath));
+      
       //get block info
       ClientProtocol namenode = dfs.dfs.namenode;
       LocatedBlocks locations = namenode.getBlockLocations(
-          filepath, 0, Long.MAX_VALUE);
+          filestr, 0, Long.MAX_VALUE);
       List<LocatedBlock> blocks = locations.getLocatedBlocks();
       assertTrue(blocks.size() > 0);
 
@@ -64,15 +79,13 @@
       //verify BlockMetaDataInfo
       Block b = locatedblock.getBlock();
       InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
-      BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
-      assertEquals(b.getBlockId(), metainfo.getBlockId());
-      assertEquals(b.getNumBytes(), metainfo.getNumBytes());
-      assertEquals(datanode.blockScanner.getLastScanTime(b),
-          metainfo.getLastScanTime());
+      checkMetaInfo(b, idp, datanode.blockScanner);
 
-      //TODO: verify GenerationStamp
-      InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
-          + idp.updateGenerationStamp(b, new GenerationStamp(456789L)));
+      //verify updateBlock
+      Block newblock = new Block(
+          b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
+      idp.updateBlock(b, newblock);
+      checkMetaInfo(newblock, idp, datanode.blockScanner);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}
Index: src/test/org/apache/hadoop/dfs/TestFileCreation.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestFileCreation.java	(revision 657965)
+++ src/test/org/apache/hadoop/dfs/TestFileCreation.java	(working copy)
@@ -151,7 +151,6 @@
       //
       Path path = new Path("/");
       System.out.println("Path : \"" + path.toString() + "\"");
-      System.out.println(fs.isDirectory(path));
       System.out.println(fs.getFileStatus(path).isDir()); 
       assertTrue("/ should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
@@ -544,4 +543,32 @@
     simulatedStorage = false;
   }
 
+  public void testConcurrentFileCreation() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      
+      Path[] p = {new Path("/foo"), new Path("/bar")};
+      
+      //write 2 files at the same time
+      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
+      int i = 0;
+      for(; i < 100; i++) {
+        out[0].write(i);
+        out[1].write(i);
+      }
+      out[0].close();
+      for(; i < 200; i++) {out[1].write(i);}
+      out[1].close();
+
+      //verify
+      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
+      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
+      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
 }
Index: src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
===================================================================
--- src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java	(revision 657965)
+++ src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java	(working copy)
@@ -97,6 +97,15 @@
         oStream = null;
       }
     }
+
+    synchronized long getGenerationStamp() {
+      return theBlock.getGenerationStamp();
+    }
+
+    synchronized void updateBlock(Block b) {
+      theBlock.generationStamp = b.generationStamp;
+      setlength(b.len);
+    }
     
     synchronized long getlength() {
       if (!finalized) {
@@ -292,6 +301,24 @@
     return binfo.getlength();
   }
 
+  /** {@inheritDoc} */
+  public long getStoredGenerationStamp(Block b) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("BInfo not found, b=" + b);
+    }
+    return binfo.getGenerationStamp();
+  }
+
+  /** {@inheritDoc} */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
+    BInfo binfo = blockMap.get(newblock);
+    if (binfo == null) {
+      throw new IOException("BInfo not found, b=" + newblock);
+    }
+    binfo.updateBlock(newblock);
+  }
+
   public synchronized void invalidate(Block[] invalidBlks) throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
@@ -586,5 +613,4 @@
   public String getStorageInfo() {
     return "Simulated FSDataset-" + storageId;
   }
-
 }
Index: src/java/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNode.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/NameNode.java	(working copy)
@@ -358,6 +358,19 @@
     }
   }
 
+  /** {@inheritDoc} */
+  public long nextGenerationStamp() {
+    return namesystem.nextGenerationStamp();
+  }
+
+  /** {@inheritDoc} */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, DatanodeID[] newtargets
+      ) throws IOException {
+    namesystem.commitBlockSynchronization(block,
+        newgenerationstamp, newlength, newtargets);
+  }
+  
   public long getPreferredBlockSize(String filename) throws IOException {
     return namesystem.getPreferredBlockSize(filename);
   }
Index: src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDatasetInterface.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/FSDatasetInterface.java	(working copy)
@@ -89,8 +89,13 @@
    * @throws IOException
    */
   public long getLength(Block b) throws IOException;
-     
+
   /**
+   * @return the generation stamp stored with the block.
+   */
+  public long getStoredGenerationStamp(Block b) throws IOException;
+
+  /**
    * Returns an input stream to read the contents of the specified block
    * @param b
    * @return an input stream to read the contents of the specified block
@@ -136,6 +141,11 @@
   public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
 
   /**
+   * Update the block to the new generation stamp and length.  
+   */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException;
+  
+  /**
    * Finalizes the block previously opened for writing using writeToBlock.
    * The block size is what is in the parameter b and it must match the amount
    *  of data written
Index: src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeProtocol.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/DatanodeProtocol.java	(working copy)
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 14: add corrupt field to LocatedBlock
+   * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and commitBlockSynchronization
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   // error code
   final static int NOTIFY = 0;
@@ -51,6 +51,7 @@
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_BLOCKREPORT = 6;   // request a block report
+  final static int DNA_RECOVERBLOCK = 7;  // request a block recovery
 
   /** 
    * Register Datanode.
@@ -132,4 +133,16 @@
    * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
+  
+  /**
+   * @return the next GenerationStamp
+   */
+  public long nextGenerationStamp() throws IOException;
+
+  /**
+   * Commit block synchronization in lease recovery
+   */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, DatanodeID[] newtargets
+      ) throws IOException;
 }
Index: src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java	(working copy)
@@ -40,7 +40,7 @@
     info.blkid = b.getBlockId();
     info.lastScanTime = blockscanner.getLastScanTime(b);
     info.len = dataset.getLength(b);
-    //TODO: get generation stamp here
+    info.generationStamp = dataset.getStoredGenerationStamp(b);
     return info;
   }
 
Index: src/java/org/apache/hadoop/dfs/BlocksMap.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlocksMap.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/BlocksMap.java	(working copy)
@@ -62,6 +62,54 @@
       return node;
     }
 
+    /**
+     * Initialize lease recovery for this object
+     * @param targets The target datanodes of the last block 
+     * @return the index of the chosen primary datanode
+     */
+    DatanodeDescriptor choosePrimaryDatanode(DatanodeDescriptor previous) {
+      //assign the first alive datanode as the primary datanode
+      DatanodeDescriptor[] targets = getAliveDatanodes();
+      if (targets.length == 0) {
+        NameNode.stateChangeLog.warn("BLOCK*"
+            + " INodeFileUnderConstruction.initLeaseRecovery:"
+            + " all targets are not alive.");
+        return null;
+      }
+
+      int i = 0;
+      if (previous != null) {
+        //try to find previous
+        for(; i < targets.length && targets[i] != previous; i++);
+        if (i < targets.length) {
+          i++;  //found previous, use the next one
+        }
+        i %=  targets.length;
+      }
+
+      //i is the chosen datanode
+      targets[i].addBlockToBeRecovered(this, targets);
+      return targets[i];
+    }
+
+    /**
+     * Get all datanodes.
+     */
+    private DatanodeDescriptor[] getAliveDatanodes() {
+      assert this.triplets != null : "BlockInfo is not initialized";
+      assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+
+      List<DatanodeDescriptor> a = new ArrayList<DatanodeDescriptor>();
+      int cap = getCapacity();
+      DatanodeDescriptor datanode;
+      for(int i = 0; i < cap && (datanode = getDatanode(i)) != null; i++) {
+        if (datanode.isAlive) {
+          a.add(datanode);
+        }
+      }
+      return a.toArray(new DatanodeDescriptor[a.size()]);
+    }
+
     BlockInfo getPrevious(int index) {
       assert this.triplets != null : "BlockInfo is not initialized";
       assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
@@ -139,6 +187,25 @@
     }
 
     /**
+     * Update this object
+     */
+    void update(long newgenerationstamp, long newlength,
+        DatanodeDescriptor[] newtargets) {
+      //remove all nodes  
+      for(int n = numNodes(); n >= 0; ) {
+        removeNode(--n);
+      }
+
+      //add all targets  
+      for(DatanodeDescriptor d : newtargets) {
+        addNode(d);
+      }
+
+      generationStamp = newgenerationstamp;
+      len = newlength;
+    }
+
+    /**
      * Add data-node this block belongs to.
      */
     boolean addNode(DatanodeDescriptor node) {
@@ -156,7 +223,13 @@
      * Remove data-node from the block.
      */
     boolean removeNode(DatanodeDescriptor node) {
-      int dnIndex = this.findDatanode(node);
+      return removeNode(findDatanode(node));
+    }
+
+    /**
+     * Remove data-node from the block.
+     */
+    boolean removeNode(int dnIndex) {
       if(dnIndex < 0) // the node is not found
         return false;
       assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -96,9 +96,8 @@
   FSDirectory dir;
 
   //
-  // Stores the block-->datanode(s) map.  Updated only in response
-  // to client-sent information.
   // Mapping: Block -> { INode, datanodes, self ref } 
+  // Updated only in response to client-sent information.
   //
   BlocksMap blocksMap = new BlocksMap();
 
@@ -238,9 +237,10 @@
 
   /**
    * The global generation stamp for this file system. 
-   * Valid values start from 1000.
+   * Valid values start from FIRST_VALID_GENERATION_STAMP.
    */
-  private GenerationStamp generationStamp = new GenerationStamp(1000);
+  private final GenerationStamp generationStamp = new GenerationStamp(
+      FIRST_VALID_GENERATION_STAMP);
 
   // Ask Datanode only up to this many blocks to delete.
   private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
@@ -997,8 +997,7 @@
       checkFsObjectLimit();
 
       // increment global generation stamp
-      long genstamp = generationStamp.nextStamp();
-
+      long genstamp = nextGenerationStamp();
       INode newNode = dir.addFile(src, permissions,
           replication, blockSize, holder, clientMachine, clientNode, genstamp);
       if (newNode == null) {
@@ -1167,30 +1166,15 @@
     } else if (!checkFileProgress(pendingFile, true)) {
       return STILL_WAITING;
     }
-        
-    // The file is no longer pending.
-    // Create permanent INode, update blockmap
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
 
-    // close file and persist block allocations for this file
-    dir.closeFile(src, newFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
 
-    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
                                   + " blocklist persisted");
+    }
 
     leaseManager.removeLease(src, holder);
-
-    //
-    // REMIND - mjc - this should be done only after we wait a few secs.
-    // The namenode isn't giving datanodes enough time to report the
-    // replicated blocks that are automatically done as part of a client
-    // write.
-    //
-
-    // Now that the file is real, we need to be sure to replicate
-    // the blocks.
-    checkReplicationFactor(newFile);
     return COMPLETE_SUCCESS;
   }
 
@@ -1602,7 +1586,7 @@
    * @param src The filename
    * @param holder The datanode that was creating the file
    */
-  void internalReleaseCreate(String src, String holder) throws IOException {
+  void internalReleaseLeasePath(Lease lease, String src) throws IOException {
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
@@ -1616,47 +1600,70 @@
                                    + src + " but file is already closed.");
       return;
     }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: "
+          + src + " is no longer written to by " + lease.getHolder());
+    }
+
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
 
-    // The last block that was allocated migth not have been used by the
-    // client. In this case, the size of the last block would be 0. A fsck
-    // will report this block as a missing block because no datanodes have it.
-    // Delete this block.
-    Block[] blocks = pendingFile.getBlocks();
-    if (blocks != null && blocks.length > 0) {
-      Block last = blocks[blocks.length - 1];
-      if (last.getNumBytes() == 0) {
-          pendingFile.removeBlock(last);
-          blocksMap.removeINode(last);
-          for (Iterator<DatanodeDescriptor> it = 
-               blocksMap.nodeIterator(last); it.hasNext();) {
-            DatanodeDescriptor node = it.next();
-            addToInvalidates(last, node);
-          }
-          /* What else do we need to do?
-           * removeStoredBlock()? we do different things when a block is 
-           * removed in different contexts. Mostly these should be
-           * same and/or should be in one place.
-           */
-      }
+    // Initialize lease recovery for pendingFile
+    pendingFile.primaryNode = leaseManager.choosePrimaryDatanode(
+        src, pendingFile.blocks);
+    if (pendingFile.primaryNode != null) {
+      lease.renew();
     }
-
+    else {
+      finalizeINodeFileUnderConstruction(src, pendingFile);
+    }
+  }
+  
+  private void finalizeINodeFileUnderConstruction(String src,
+      INodeFileUnderConstruction pendingFile) throws IOException {
     // The file is no longer pending.
-    // Create permanent INode, update blockmap
+    // Create permanent INode, update block map
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
-    // replicate blocks of this file.
     checkReplicationFactor(newFile);
-  
-    NameNode.stateChangeLog.info("DIR* NameSystem.internalReleaseCreate: " + 
-                                 src + " is no longer written to by " + 
-                                 holder);
   }
 
+  synchronized void commitBlockSynchronization(Block lastblock,
+      long newgenerationstamp, long newlength, DatanodeID[] newtargets
+      ) throws IOException {
+    BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
+    if (blockinfo == null) {
+      throw new IOException("Block (=" + lastblock + ") not found");
+    }
+    INodeFile iFile = blockinfo.getINode();
+    if (!iFile.isUnderConstruction()) {
+      throw new IOException("Unexpected block (=" + lastblock
+          + ") since the file (=" + iFile.getLocalName()
+          + ") is not under construction");
+    }
+
+    //update block info
+    DatanodeDescriptor[] descriptors = new DatanodeDescriptor[newtargets.length];
+    for(int i = 0; i < newtargets.length; i++) {
+      descriptors[i] = getDatanode(newtargets[i]);
+    }
+    blockinfo.update(newgenerationstamp, newlength, descriptors);
+
+    //remove lease, close file
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+    String src = leaseManager.removeLease(pendingFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
+
+    //commit changes to FSEditLog 
+    FSEditLog editlog = getEditLog(); 
+    editlog.logOpenFile(src, pendingFile);    
+    editlog.logSync();
+  }
+
+
   /**
    * Renew the lease(s) held by the given client
    */
@@ -1970,6 +1977,10 @@
         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
         updateStats(nodeinfo, true);
         
+        //check lease recovery
+        if (cmd == null) {
+          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        }
         //check pending replication
         if (cmd == null) {
           cmd = nodeinfo.getReplicationCommand(
@@ -4187,6 +4198,15 @@
     return generationStamp.getStamp();
   }
 
+  /**
+   * Increments, logs and then returns the stamp
+   */
+  long nextGenerationStamp() {
+    long gs = generationStamp.nextStamp();
+    getEditLog().logGenerationStamp(gs);
+    return gs;
+  }
+
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //
Index: src/java/org/apache/hadoop/dfs/LeaseManager.java
===================================================================
--- src/java/org/apache/hadoop/dfs/LeaseManager.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/LeaseManager.java	(working copy)
@@ -21,6 +21,8 @@
 import java.util.*;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 
 import org.apache.hadoop.fs.Path;
 
@@ -85,21 +87,41 @@
   }
 
   /**
-   * deletes the lease for the specified file
+   * Remove the lease for the specified src and holder
    */
-  synchronized void removeLease(String src, String holder) throws IOException {
+  synchronized Lease removeLease(String src, String holder) throws IOException {
     Lease lease = getLease(holder);
     if (lease != null) {
       lease.completedCreate(src);
+      sortedLeasesByPath.remove(src);
+
       if (!lease.hasPath()) {
-        leases.remove(new StringBytesWritable(holder));
+        leases.remove(lease.holder);
         sortedLeases.remove(lease);
-        sortedLeasesByPath.remove(src);
       }
     }
+    return lease;
   }
 
   /**
+   * Remove the lease for the specified pendingFile
+   */
+  synchronized String removeLease(
+      INodeFileUnderConstruction pendingFile) throws IOException {
+    Lease lease = getLease(pendingFile.clientName.toString());
+    if (lease != null) {
+      String src = lease.remove(pendingFile);
+      if (src != null) {
+        if (!lease.hasPath()) {
+          removeExpiredLease(lease);
+        }
+        return src;
+      }
+    }
+    throw new IOException("pendingFile (=" + pendingFile + ") not found.");
+  }
+
+  /**
    * Renew the lease(s) held by the given client
    */
   synchronized void renewLease(String holder) throws IOException {
@@ -112,7 +134,8 @@
   }
 
   synchronized void removeExpiredLease(Lease lease) throws IOException {
-    lease.releaseLocks();
+    lease.releasePaths();
+    lease.paths.clear();
     leases.remove(lease.holder);
     if (!sortedLeases.remove(lease)) {
       LOG.error("startFile: Unknown failure trying to remove " + lease + 
@@ -160,14 +183,22 @@
 
     boolean hasPath() {return !paths.isEmpty();}
 
-    void releaseLocks() throws IOException {
-      String holderStr = holder.getString();
-      for(StringBytesWritable s : paths) {
-        fsnamesystem.internalReleaseCreate(s.getString(), holderStr);
+    void releasePaths() throws IOException {
+      for (Iterator<StringBytesWritable> it = paths.iterator(); it.hasNext();)
+        fsnamesystem.internalReleaseLeasePath(this, it.next().getString());
+    }
+
+    private String remove(INodeFileUnderConstruction pendingFile) {
+      for(Iterator<StringBytesWritable> i = paths.iterator(); i.hasNext(); ) {
+        String src = i.next().toString();
+        if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
+          i.remove();
+          return src;
+        }
       }
-      paths.clear();
+      return null;
     }
-  
+
     /** {@inheritDoc} */
     public String toString() {
       return "[Lease.  Holder: " + holder
@@ -207,6 +238,10 @@
       return holder.hashCode();
     }
     
+    String getHolder() throws IOException {
+      return holder.getString();
+    }
+
     Collection<StringBytesWritable> getPaths() {
       return paths;
     }
@@ -306,9 +341,7 @@
               while ((sortedLeases.size() > 0) &&
                      ((top = sortedLeases.first()) != null)) {
                 if (top.expiredHardLimit()) {
-                  LOG.info("Lease Monitor: Removing lease " + top
-                      + ", leases remaining: " + sortedLeases.size());
-                  removeExpiredLease(top);
+                  top.releasePaths();
                 } else {
                   break;
                 }
@@ -328,4 +361,111 @@
       }
     }
   }
+
+  /*
+   * The following codes provides useful static methods for lease recovery.
+   * 
+   * Lease recovery algorithm
+   * 1) Namenode retrieves lease information
+   * 2) For each file f in the lease, consider the last block b of f
+   * 2.1) Get the datanodes which contains b
+   * 2.2) Assign one of the datanodes as the primary datanode p
+
+   * 2.3) p obtains a new generation stamp form the namenode
+   * 2.4) p get the block info from each datanode
+   * 2.5) p computes the minimum block length
+   * 2.6) p updates the datanodes, which have a valid generation stamp,
+   *      with the new generation stamp and the minimum block length 
+   * 2.7) p acknowledges the namenode the update results
+
+   * 2.8) Namenode updates the BlockInfo
+   * 2.9) Namenode removes f from the lease
+   *      and removes the lease once all files have been removed
+   * 2.10) Namenode commit changes to edit log
+   */
+  private static class BlockRecord { 
+    final DatanodeID id;
+    final InterDatanodeProtocol datanode;
+    final Block block;
+    
+    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+      this.id = id;
+      this.datanode = datanode;
+      this.block = block;
+    }
+  }
+
+  DatanodeDescriptor choosePrimaryDatanode(String src, BlockInfo[] blocks) {
+    DatanodeDescriptor primary = null;
+    if (blocks != null && blocks.length > 0) {
+      //assign the first alive datanode as the primary datanode
+      BlockInfo last = blocks[blocks.length - 1];
+      primary = last.choosePrimaryDatanode(primary);
+    }
+    return primary;
+    
+  }
+
+  static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
+      DatanodeProtocol namenode, Configuration conf) {
+    for(int i = 0; i < blocks.length; i++) {
+      try {
+        recoverBlock(blocks[i], targets[i], namenode, conf);
+      } catch (IOException e) {
+        LOG.warn("recoverBlocks, i=" + i, e);
+      }
+    }
+  }
+
+  /** Used by DataNode */ 
+  static void recoverBlock(Block block, DatanodeID[] datanodeids,
+      DatanodeProtocol namenode, Configuration conf) throws IOException {
+    List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+    long minlength = Long.MAX_VALUE;
+
+    //check generation stamps
+    for(DatanodeID id : datanodeids) {
+      try {
+        InterDatanodeProtocol datanode
+            = DataNode.createInterDataNodeProtocolProxy(id, conf);
+        BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+        if (info.getGenerationStamp() >= block.generationStamp) {
+          syncList.add(new BlockRecord(id, datanode, new Block(info)));
+          if (info.len < minlength) {
+            minlength = info.len;
+          }
+        }
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn(
+            "Failed to getBlockMetaDataInfo for block (=" + block 
+            + ") from datanode (=" + id + ")", e);
+      }
+    }
+    
+    syncBlock(block, minlength, syncList, namenode);
+  }
+
+  /** block synchronization */
+  static void syncBlock(Block block, long minlength,
+      List<BlockRecord> syncList, DatanodeProtocol namenode
+      ) throws IOException {
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+    long generationstamp = namenode.nextGenerationStamp();
+    Block newblock = new Block(block.blkid, minlength, generationstamp);
+
+    for(BlockRecord r : syncList) {
+      try {
+        r.datanode.updateBlock(r.block, newblock);
+        successList.add(r.id);
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+            + newblock + ", datanode=" + r.id + ")", e);
+      }
+    }
+
+    namenode.commitBlockSynchronization(block,
+        newblock.generationStamp, newblock.len,
+        successList.toArray(new DatanodeID[successList.size()]));
+  }
 }
Index: src/java/org/apache/hadoop/dfs/BlockCommand.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlockCommand.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/BlockCommand.java	(working copy)
@@ -18,6 +18,9 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import java.util.List;
+
+import org.apache.hadoop.dfs.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.io.*;
 
 abstract class DatanodeCommand implements Writable {
@@ -101,12 +104,17 @@
   /**
    * Create BlockCommand for transferring blocks to another datanode
    * @param blocks    blocks to be transferred 
-   * @param targets   nodes to transfer
    */
-  BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-    super( DatanodeProtocol.DNA_TRANSFER);
-    this.blocks = blocks;
-    this.targets = targets;
+  BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
+    super(action);
+
+    blocks = new Block[blocktargetlist.size()]; 
+    targets = new DatanodeInfo[blocks.length][];
+    for(int i = 0; i < blocks.length; i++) {
+      BlockTargetPair p = blocktargetlist.get(i);
+      blocks[i] = p.block;
+      targets[i] = p.targets;
+    }
   }
 
   private static final DatanodeInfo[][] EMPTY_TARGET = {};
Index: src/java/org/apache/hadoop/dfs/INode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/INode.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/INode.java	(working copy)
@@ -778,16 +778,13 @@
 }
 
 class INodeFileUnderConstruction extends INodeFile {
-  protected StringBytesWritable clientName;         // lease holder
-  protected StringBytesWritable clientMachine;
-  protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+  StringBytesWritable clientName = null;         // lease holder
+  StringBytesWritable clientMachine = null;
+  DatanodeDescriptor clientNode = null; // if client is a cluster node too.
+  DatanodeDescriptor primaryNode = null; // the node working on lease recovery
+  
+  INodeFileUnderConstruction() {}
 
-  INodeFileUnderConstruction() {
-    clientName = null;
-    clientMachine = null;
-    clientNode = null;
-  }
-
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
                              long preferredBlockSize,
Index: src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(working copy)
@@ -30,17 +30,15 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 1: added getBlockMetaDataInfo and updateGenerationStamp
+   * 2: change updateGenerationStamp to updataBlock
    */
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;
 
   /** @return the BlockMetaDataInfo of a block */
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
-   * Update the GenerationStamp of a block
-   * @return true iff update was required and done successfully 
+   * Update the block to the new generation stamp and length.  
    */
-  boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
-      ) throws IOException;
+  void updateBlock(Block oldblock, Block newblock) throws IOException;
 }
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/FSDataset.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDataset.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/FSDataset.java	(working copy)
@@ -552,11 +552,51 @@
                      "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
   }
   protected File getMetaFile(Block b) throws IOException {
-    File blockFile = getBlockFile( b );
-    return new File( blockFile.getAbsolutePath() + 
-                     "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
+    return getMetaFile(getBlockFile(b), b);
   }
 
+  /** Find the corresponding meta data file from a given block file */
+  static File findMetaFile(final File blockFile) throws IOException {
+    final String prefix = blockFile.getName() + "_";
+    final File parent = blockFile.getParentFile();
+    File[] matches = parent.listFiles(new FilenameFilter() {
+      public boolean accept(File dir, String name) {
+        return dir.equals(parent)
+            && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
+      }
+    });
+
+    if (matches == null || matches.length == 0) {
+      throw new IOException("Meta file not found, blockFile=" + blockFile);
+    }
+    else if (matches.length > 1) {
+      throw new IOException("Found more than one meta files: " 
+          + Arrays.asList(matches));
+    }
+    return matches[0];
+  }
+  
+  /** Find the corresponding meta data file from a given block file */
+  static long parseGenerationStamp(File blockFile, File metaFile
+      ) throws IOException {
+    String metaname = metaFile.getName();
+    String gs = metaname.substring(blockFile.getName().length() + 1,
+        metaname.length() - METADATA_EXTENSION.length());
+    try {
+      return Long.parseLong(gs);
+    } catch(NumberFormatException nfe) {
+      throw (IOException)new IOException("blockFile=" + blockFile
+          + ", metaFile=" + metaFile).initCause(nfe);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public long getStoredGenerationStamp(Block b) throws IOException {
+    File blockFile = getBlockFile(b);
+    File metaFile = findMetaFile(blockFile);
+    return parseGenerationStamp(blockFile, metaFile);
+  }
+
   public boolean metaFileExists(Block b) throws IOException {
     return getMetaFile(b).exists();
   }
@@ -679,6 +719,105 @@
     return info.detachBlock(block, numLinks);
   }
 
+  private synchronized void updateVolumeMap(Block oldblock, Block newblock
+      ) throws IOException {
+    if (!volumeMap.containsKey(oldblock)) {
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("volumeMap=" + volumeMap);
+      }
+      throw new IOException("oldblock (=" + oldblock
+          + ") not found in the volumeMap");
+    }
+
+    DatanodeBlockInfo info = volumeMap.remove(oldblock);
+    volumeMap.put(newblock, info);
+  }
+
+  /** {@inheritDoc} */
+  public synchronized void updateBlock(Block oldblock, Block newblock
+      ) throws IOException {
+    if (oldblock.getBlockId() != newblock.getBlockId()) {
+      throw new IOException("Cannot update oldblock (=" + oldblock
+          + ") to newblock (=" + newblock + ").");
+    }
+
+    File blockFile = getBlockFile(oldblock);
+    long oldlen = blockFile.length();
+    File oldMetaFile = getMetaFile(blockFile, oldblock);
+    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+    
+    //rename meta file to a tmp file
+    File tmpMetaFile = new File(oldMetaFile.getParent(),
+        oldMetaFile.getName()+"_tmp");
+    oldMetaFile.renameTo(tmpMetaFile);
+
+    //update generation stamp
+    if (oldgs > newblock.generationStamp) {
+      throw new IOException("Cannot update block (id=" + newblock.blkid
+          + ") generation stamp from " + oldgs + " to " + newblock.generationStamp);
+    }
+    
+    //update length
+    if (newblock.len > oldlen) {
+      throw new IOException("Cannot update block file (=" + blockFile
+          + ") length from " + oldlen + " to " + newblock.len);
+    }
+    if (newblock.len < oldlen) {
+      truncateBlock(blockFile, tmpMetaFile, oldlen, newblock.len);
+    }
+
+    //rename the tmp file to the new meta file (with new generation stamp) 
+    tmpMetaFile.renameTo(getMetaFile(blockFile, newblock));
+
+    updateVolumeMap(oldblock, newblock);
+  }
+
+  static private void truncateBlock(File blockFile, File metaFile,
+      long oldlen, long newlen) throws IOException {
+    if (newlen == oldlen) {
+      return;
+    }
+    if (newlen > oldlen) {
+      throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
+          + ") to newlen (=" + newlen + ")");
+    }
+
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
+    int checksumsize = dcs.getChecksumSize();
+    int bpc = dcs.getBytesPerChecksum();
+    long n = (newlen - 1)/bpc + 1;
+    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+    long lastchunkoffset = (n - 1)*bpc;
+    int lastchunksize = (int)(newlen - lastchunkoffset); 
+    byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 
+
+    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+    try {
+      //truncate blockFile 
+      blockRAF.setLength(newlen);
+ 
+      //read last chunk
+      blockRAF.seek(lastchunkoffset);
+      blockRAF.readFully(b, 0, lastchunksize);
+    } finally {
+      blockRAF.close();
+    }
+
+    //compute checksum
+    dcs.update(b, 0, lastchunksize);
+    dcs.writeValue(b, 0, false);
+
+    //update metaFile 
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    try {
+      metaRAF.setLength(newmetalen);
+      metaRAF.seek(newmetalen - checksumsize);
+      metaRAF.write(b, 0, checksumsize);
+    } finally {
+      metaRAF.close();
+    }
+  }
+
   /**
    * Start writing to a block file
    * If isRecovery is true and the block pre-exists, then we kill all
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -19,7 +19,6 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -80,7 +79,8 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
+public class DataNode extends Configured 
+    implements InterDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
@@ -178,7 +178,7 @@
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
-      
+    super(conf);
     datanodeObject = this;
 
     try {
@@ -772,12 +772,11 @@
   private boolean processCommand(DatanodeCommand cmd) throws IOException {
     if (cmd == null)
       return true;
+    final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
-      //
       // Send a copy of a block to another datanode
-      //
-      BlockCommand bcmd = (BlockCommand)cmd;
       transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
@@ -785,7 +784,7 @@
       // Some local block(s) are obsolete and can be 
       // safely garbage-collected.
       //
-      Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+      Block toDelete[] = bcmd.getBlocks();
       try {
         if (blockScanner != null) {
           blockScanner.deleteBlocks(toDelete);
@@ -821,6 +820,14 @@
         scheduleBlockReport(initialBlockReportDelay);
       }
       break;
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+      new Daemon(threadGroup, new Runnable() {
+        public void run() {
+          LeaseManager.recoverBlocks(bcmd.getBlocks(), bcmd.getTargets(),
+              namenode, getConf());
+        }
+      }).start();
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -3023,12 +3030,11 @@
   }
 
   /** {@inheritDoc} */
-  public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+      LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
     }
-    //TODO: update generation stamp here
-    return false;
+    data.updateBlock(oldblock, newblock);
   }
 
   /** {@inheritDoc} */
@@ -3036,8 +3042,8 @@
       ) throws IOException {
     if (protocol.equals(InterDatanodeProtocol.class.getName())) {
       return InterDatanodeProtocol.versionID; 
-    } else {
-      throw new IOException("Unknown protocol to name node: " + protocol);
     }
+    throw new IOException("Unknown protocol to " + getClass().getSimpleName()
+        + ": " + protocol);
   }
 }
Index: src/java/org/apache/hadoop/dfs/GenerationStamp.java
===================================================================
--- src/java/org/apache/hadoop/dfs/GenerationStamp.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/GenerationStamp.java	(working copy)
@@ -64,9 +64,9 @@
   }
 
   /**
-   * Returns the current stamp and incrments the counter
+   * First increments the counter and then returns the stamp 
    */
-  public long nextStamp() {
+  public synchronized long nextStamp() {
     this.genstamp++;
     return this.genstamp;
   }
Index: src/java/org/apache/hadoop/dfs/FSConstants.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSConstants.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/FSConstants.java	(working copy)
@@ -131,7 +131,9 @@
   // Currently we set the maximum length to 8k characters and the maximum depth to 1k.  
   public static int MAX_PATH_LENGTH = 8000;
   public static int MAX_PATH_DEPTH = 1000;
-    
+
+  public static long FIRST_VALID_GENERATION_STAMP = 1000L;
+  
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
   //Used for writing header etc.
   static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
Index: src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(working copy)
@@ -40,28 +40,58 @@
 
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
+  static class BlockTargetPair {
+    Block block;
+    DatanodeDescriptor[] targets;    
 
+    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+      this.block = block;
+      this.targets = targets;
+    }
+  }
+
+  private static class BlockQueue {
+    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+
+    synchronized int size() {return blockq.size();}
+
+    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
+      return blockq.offer(new BlockTargetPair(block, targets));
+    }
+
+    synchronized List<BlockTargetPair> poll(int numTargets) {
+      if (numTargets <= 0 || blockq.isEmpty()) {
+        return null;
+      }
+      else {
+        List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+        for(; !blockq.isEmpty() && numTargets > 0; ) {
+          numTargets -= blockq.peek().targets.length; 
+          if (numTargets >= 0) {
+            results.add(blockq.poll());
+          }
+        }
+        return results;
+      }
+    }
+  }
+
   private volatile BlockInfo blockList = null;
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
 
-  //
-  // List of blocks to be replicated by this datanode
-  // Also, a list of datanodes per block to indicate the target
-  // datanode of this replication.
-  //
-  List<Block> replicateBlocks;
-  List<DatanodeDescriptor[]> replicateTargetSets;
-  Set<Block> invalidateBlocks;
+  /** A queue of blocks to be replicated by this datanode */
+  private BlockQueue replicateBlocks = new BlockQueue();
+  /** A queue of blocks to be recovered by this datanode */
+  private BlockQueue recoverBlocks = new BlockQueue();
+  /** A set of blocks to be invalidated by this datanode */
+  private Set<Block> invalidateBlocks = new TreeSet<Block>();
+
   boolean processedBlockReport = false;
-
   
   /** Default constructor */
-  public DatanodeDescriptor() {
-    super();
-    initWorkLists();
-  }
+  public DatanodeDescriptor() {}
   
   /** DatanodeDescriptor constructor
    * @param nodeID id of the data node
@@ -107,7 +137,6 @@
                             int xceiverCount) {
     super(nodeID);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
   /** DatanodeDescriptor constructor
@@ -128,18 +157,8 @@
                             int xceiverCount) {
     super(nodeID, networkLocation, hostName);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
-  /*
-   * initialize list of blocks that store work for the datanodes
-   */
-  private void initWorkLists() {
-    replicateBlocks = new ArrayList<Block>();
-    replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
-    invalidateBlocks = new TreeSet<Block>();
-  }
-
   /**
    * Add data-node to the block.
    * Add block to the head of the list of blocks belonging to the data-node.
@@ -227,13 +246,18 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    synchronized (replicateBlocks) {
-      replicateBlocks.add(block);
-      replicateTargetSets.add(targets);
-    }
+    replicateBlocks.offer(block, targets);
   }
 
   /**
+   * Store block recovery work.
+   */
+  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
+    assert(block != null && targets != null && targets.length > 0);
+    recoverBlocks.offer(block, targets);
+  }
+
+  /**
    * Store block invalidation work.
    */
   void addBlocksToBeInvalidated(List<Block> blocklist) {
@@ -245,16 +269,14 @@
     }
   }
 
-  /*
+  /**
    * The number of work items that are pending to be replicated
    */
   int getNumberOfBlocksToBeReplicated() {
-    synchronized (replicateBlocks) {
-      return replicateBlocks.size();
-    }
+    return replicateBlocks.size();
   }
 
-  /*
+  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -279,36 +301,16 @@
     return processedBlockReport;
   }
 
-  /**
-   * Remove the specified number of target sets
-   */
-  BlockCommand getReplicationCommand(int maxNumTransfers) {
-    synchronized (replicateBlocks) {
-      assert(replicateBlocks.size() == replicateTargetSets.size());
+  BlockCommand getReplicationCommand(int maxTransfers) {
+    List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
+    return blocktargetlist == null? null:
+        new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
+  }
 
-      if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
-        return null;
-      }
-      int numTransfers = 0;
-      int numBlocks = 0;
-      int i;
-      for (i = 0; i < replicateTargetSets.size() && 
-             numTransfers < maxNumTransfers; i++) {
-        numTransfers += replicateTargetSets.get(i).length;
-      }
-      numBlocks = i;
-      Block[] blocklist = new Block[numBlocks];
-      DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][];
-
-      for (i = 0; i < numBlocks; i++) {
-        blocklist[i] = replicateBlocks.get(0);
-        targets[i] = replicateTargetSets.get(0);
-        replicateBlocks.remove(0);
-        replicateTargetSets.remove(0);
-      }
-      assert(blocklist.length > 0 && targets.length > 0);
-      return new BlockCommand(blocklist, targets);
-    }
+  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
+    return blocktargetlist == null? null:
+        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
   }
 
   /**
Index: src/java/org/apache/hadoop/dfs/FSDirectory.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDirectory.java	(revision 657965)
+++ src/java/org/apache/hadoop/dfs/FSDirectory.java	(working copy)
@@ -166,7 +166,6 @@
     }
     // add create file record to log, record new generation stamp
     fsImage.getEditLog().logOpenFile(path, newNode);
-    fsImage.getEditLog().logGenerationStamp(generationStamp);
 
     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                   +path+" is added to the file system");
@@ -287,13 +286,14 @@
    */
   void closeFile(String path, INodeFile file) throws IOException {
     waitForReady();
-
     synchronized (rootDir) {
       // file is closed
       fsImage.getEditLog().logCloseFile(path, file);
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
                                     +path+" with "+ file.getBlocks().length 
                                     +" blocks is persisted to the file system");
+      }
     }
   }
 
