Index: src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(revision 656333)
+++ src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java	(working copy)
@@ -71,8 +71,6 @@
           metainfo.getLastScanTime());
 
       //TODO: verify GenerationStamp
-      InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
-          + idp.updateGenerationStamp(b, new GenerationStamp(456789L)));
     }
     finally {
       if (cluster != null) {cluster.shutdown();}
Index: src/test/org/apache/hadoop/dfs/TestFileCreation.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestFileCreation.java	(revision 656333)
+++ src/test/org/apache/hadoop/dfs/TestFileCreation.java	(working copy)
@@ -151,7 +151,6 @@
       //
       Path path = new Path("/");
       System.out.println("Path : \"" + path.toString() + "\"");
-      System.out.println(fs.isDirectory(path));
       System.out.println(fs.getFileStatus(path).isDir()); 
       assertTrue("/ should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
@@ -544,4 +543,32 @@
     simulatedStorage = false;
   }
 
+  public void testConcurrentFileCreation() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      
+      Path[] p = {new Path("/foo"), new Path("/bar")};
+      
+      //write 2 files at the same time
+      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
+      int i = 0;
+      for(; i < 100; i++) {
+        out[0].write(i);
+        out[1].write(i);
+      }
+      out[0].close();
+      for(; i < 200; i++) {out[1].write(i);}
+      out[1].close();
+
+      //verify
+      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
+      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
+      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
 }
Index: src/java/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNode.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/NameNode.java	(working copy)
@@ -366,6 +366,19 @@
     }
   }
 
+  /** {@inheritDoc} */
+  public long nextGenerationStamp() {
+    return namesystem.nextGenerationStamp();
+  }
+
+  /** {@inheritDoc} */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, DatanodeID[] newtargets
+      ) throws IOException {
+    namesystem.commitBlockSynchronization(block,
+        newgenerationstamp, newlength, newtargets);
+  }
+  
   public long getPreferredBlockSize(String filename) throws IOException {
     return namesystem.getPreferredBlockSize(filename);
   }
Index: src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeProtocol.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/DatanodeProtocol.java	(working copy)
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 14: add corrupt field to LocatedBlock
+   * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and commitBlockSynchronization
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   // error code
   final static int NOTIFY = 0;
@@ -51,6 +51,7 @@
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_BLOCKREPORT = 6;   // request a block report
+  final static int DNA_RECOVERBLOCK = 7;  // request a block recovery
 
   /** 
    * Register Datanode.
@@ -132,4 +133,16 @@
    * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
-}
+  
+  /**
+   * @return the next GenerationStamp
+   */
+  public long nextGenerationStamp() throws IOException;
+
+  /**
+   * Commit block synchronization in lease recovery
+   */
+  public void commitBlockSynchronization(Block block,
+      long newgenerationstamp, long newlength, DatanodeID[] newtargets
+      ) throws IOException;
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/BlocksMap.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlocksMap.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/BlocksMap.java	(working copy)
@@ -62,6 +62,54 @@
       return node;
     }
 
+    /**
+     * Initialize lease recovery for this object
+     * @param targets The target datanodes of the last block 
+     * @return the index of the chosen primary datanode
+     */
+    DatanodeDescriptor choosePrimaryDatanode(DatanodeDescriptor previous) {
+      //assign the first alive datanode as the primary datanode
+      DatanodeDescriptor[] targets = getAliveDatanodes();
+      if (targets.length == 0) {
+        NameNode.stateChangeLog.warn("BLOCK*"
+            + " INodeFileUnderConstruction.initLeaseRecovery:"
+            + " all targets are not alive.");
+        return null;
+      }
+
+      int i = 0;
+      if (previous != null) {
+        //try to find previous
+        for(; i < targets.length && targets[i] != previous; i++);
+        if (i < targets.length) {
+          i++;  //found previous, use the next one
+        }
+        i %=  targets.length;
+      }
+
+      //i is the chosen datanode
+      targets[i].addBlockToBeRecovered(this, targets);
+      return targets[i];
+    }
+
+    /**
+     * Get all datanodes.
+     */
+    private DatanodeDescriptor[] getAliveDatanodes() {
+      assert this.triplets != null : "BlockInfo is not initialized";
+      assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+
+      List<DatanodeDescriptor> a = new ArrayList<DatanodeDescriptor>();
+      int cap = getCapacity();
+      DatanodeDescriptor datanode;
+      for(int i = 0; i < cap && (datanode = getDatanode(i)) != null; i++) {
+        if (datanode.isAlive) {
+          a.add(datanode);
+        }
+      }
+      return a.toArray(new DatanodeDescriptor[a.size()]);
+    }
+
     BlockInfo getPrevious(int index) {
       assert this.triplets != null : "BlockInfo is not initialized";
       assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
@@ -139,6 +187,24 @@
     }
 
     /**
+     * Update this object
+     */
+    void update(long newgenerationstamp, long newlength, DatanodeDescriptor[] targets) {
+      //remove all nodes  
+      for(int n = numNodes(); n >= 0; ) {
+        removeNode(--n);
+      }
+
+      //add all nodes  
+      for(DatanodeDescriptor d : targets) {
+        addNode(d);
+      }
+
+      generationStamp = newgenerationstamp;
+      len = newlength;
+    }
+
+    /**
      * Add data-node this block belongs to.
      */
     boolean addNode(DatanodeDescriptor node) {
@@ -156,7 +222,13 @@
      * Remove data-node from the block.
      */
     boolean removeNode(DatanodeDescriptor node) {
-      int dnIndex = this.findDatanode(node);
+      return removeNode(findDatanode(node));
+    }
+
+    /**
+     * Remove data-node from the block.
+     */
+    boolean removeNode(int dnIndex) {
       if(dnIndex < 0) // the node is not found
         return false;
       assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -86,9 +86,8 @@
   FSDirectory dir;
 
   //
-  // Stores the block-->datanode(s) map.  Updated only in response
-  // to client-sent information.
   // Mapping: Block -> { INode, datanodes, self ref } 
+  // Updated only in response to client-sent information.
   //
   BlocksMap blocksMap = new BlocksMap();
 
@@ -228,9 +227,10 @@
 
   /**
    * The global generation stamp for this file system. 
-   * Valid values start from 1000.
+   * Valid values start from FIRST_VALID_GENERATION_STAMP.
    */
-  private GenerationStamp generationStamp = new GenerationStamp(1000);
+  private final GenerationStamp generationStamp = new GenerationStamp(
+      FIRST_VALID_GENERATION_STAMP);
 
   // Ask Datanode only up to this many blocks to delete.
   private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
@@ -950,8 +950,7 @@
       checkFsObjectLimit();
 
       // increment global generation stamp
-      long genstamp = generationStamp.nextStamp();
-
+      long genstamp = nextGenerationStamp();
       INode newNode = dir.addFile(src, permissions,
           replication, blockSize, holder, clientMachine, clientNode, genstamp);
       if (newNode == null) {
@@ -1130,30 +1129,15 @@
     } else if (!checkFileProgress(pendingFile, true)) {
       return STILL_WAITING;
     }
-        
-    // The file is no longer pending.
-    // Create permanent INode, update blockmap
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
 
-    // close file and persist block allocations for this file
-    dir.closeFile(src, newFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
 
-    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
                                   + " blocklist persisted");
+    }
 
     leaseManager.removeLease(src, holder);
-
-    //
-    // REMIND - mjc - this should be done only after we wait a few secs.
-    // The namenode isn't giving datanodes enough time to report the
-    // replicated blocks that are automatically done as part of a client
-    // write.
-    //
-
-    // Now that the file is real, we need to be sure to replicate
-    // the blocks.
-    checkReplicationFactor(newFile);
     return COMPLETE_SUCCESS;
   }
 
@@ -1541,7 +1525,7 @@
    * @param src The filename
    * @param holder The datanode that was creating the file
    */
-  void internalReleaseCreate(String src, String holder) throws IOException {
+  void internalReleaseCreate(String src, Lease lease) throws IOException {
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
@@ -1555,36 +1539,69 @@
                                    + src + " but file is already closed.");
       return;
     }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: "
+          + src + " is no longer written to by " + lease.getHolder());
+    }
+
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
 
-    // The last block that was allocated migth not have been used by the
-    // client. In this case, the size of the last block would be 0. A fsck
-    // will report this block as a missing block because no datanodes have it.
-    // Delete this block.
-    Block[] blocks = pendingFile.getBlocks();
-    if (blocks != null && blocks.length > 0) {
-      Block last = blocks[blocks.length - 1];
-      if (last.getNumBytes() == 0) {
-          pendingFile.removeBlock(last);
-      }
+    // Initialize lease recovery for pendingFile
+    pendingFile.primaryNode = leaseManager.choosePrimaryDatanode(
+        src, pendingFile.blocks);
+    if (pendingFile.primaryNode != null) {
+      lease.renew();
     }
-
+    else {
+      finalizeINodeFileUnderConstruction(src, pendingFile);
+    }
+  }
+  
+  private void finalizeINodeFileUnderConstruction(String src,
+      INodeFileUnderConstruction pendingFile) throws IOException {
     // The file is no longer pending.
-    // Create permanent INode, update blockmap
+    // Create permanent INode, update block map
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
-    // replicate blocks of this file.
     checkReplicationFactor(newFile);
-  
-    NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + 
-                                  src + " is no longer written to by " + 
-                                  holder);
   }
 
+  synchronized void commitBlockSynchronization(Block lastblock,
+      long newgenerationstamp, long newlength, DatanodeID[] newtargets
+      ) throws IOException {
+    BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
+    if (blockinfo == null) {
+      throw new IOException("Block (=" + lastblock + ") not found");
+    }
+    INodeFile iFile = blockinfo.getINode();
+    if (!iFile.isUnderConstruction()) {
+      throw new IOException("Unexpected block (=" + lastblock
+          + ") since the file (=" + iFile.getLocalName()
+          + ") is not under construction");
+    }
+
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+    DatanodeDescriptor[] targets = new DatanodeDescriptor[newtargets.length];
+    for(int i = 0; i < newtargets.length; i++) {
+      targets[i] = getDatanode(newtargets[i]);
+    }
+
+    blockinfo.update(newgenerationstamp, newlength, targets);
+
+    String src = leaseManager.removeLease(pendingFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile);
+
+    //commit changes to FSEditLog 
+    FSEditLog editlog = getEditLog(); 
+    editlog.logOpenFile(src, pendingFile);    
+    editlog.logSync();
+  }
+
+
   /**
    * Renew the lease(s) held by the given client
    */
@@ -1892,6 +1909,10 @@
         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
         updateStats(nodeinfo, true);
         
+        //check lease recovery
+        if (cmd == null) {
+          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        }
         //check pending replication
         if (cmd == null) {
           cmd = nodeinfo.getReplicationCommand(
@@ -4109,6 +4130,15 @@
     return generationStamp.getStamp();
   }
 
+  /**
+   * Increments, logs and then returns the stamp
+   */
+  long nextGenerationStamp() {
+    long gs = generationStamp.nextStamp();
+    getEditLog().logGenerationStamp(gs);
+    return gs;
+  }
+
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //
Index: src/java/org/apache/hadoop/dfs/LeaseManager.java
===================================================================
--- src/java/org/apache/hadoop/dfs/LeaseManager.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/LeaseManager.java	(working copy)
@@ -21,6 +21,8 @@
 import java.util.*;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 
 import org.apache.hadoop.fs.Path;
 
@@ -85,21 +87,42 @@
   }
 
   /**
-   * deletes the lease for the specified file
+   * Remove the lease for the specified src and holder
    */
-  synchronized void removeLease(String src, String holder) throws IOException {
+  synchronized Lease removeLease(String src, String holder) throws IOException {
     Lease lease = getLease(holder);
     if (lease != null) {
       lease.completedCreate(src);
+      sortedLeasesByPath.remove(src);
+
       if (!lease.hasPath()) {
-        leases.remove(new StringBytesWritable(holder));
+        leases.remove(lease.holder);
         sortedLeases.remove(lease);
-        sortedLeasesByPath.remove(src);
       }
     }
+    return lease;
   }
 
+  
   /**
+   * Remove the lease for the specified pendingFile
+   */
+  synchronized String removeLease(
+      INodeFileUnderConstruction pendingFile) throws IOException {
+    Lease lease = getLease(pendingFile.clientName.toString());
+    if (lease != null) {
+      String src = lease.remove(pendingFile);
+      if (src != null) {
+        if (!lease.hasPath()) {
+          removeExpiredLease(lease);
+        }
+        return src;
+      }
+    }
+    throw new IOException("pendingFile (=" + pendingFile + ") not found.");
+  }
+
+  /**
    * Renew the lease(s) held by the given client
    */
   synchronized void renewLease(String holder) throws IOException {
@@ -112,7 +135,8 @@
   }
 
   synchronized void removeExpiredLease(Lease lease) throws IOException {
-    lease.releaseLocks();
+    lease.releasePaths();
+    lease.paths.clear();
     leases.remove(lease.holder);
     if (!sortedLeases.remove(lease)) {
       LOG.error("startFile: Unknown failure trying to remove " + lease + 
@@ -127,7 +151,7 @@
       // remove the file from the lease
       if (lease.completedCreate(src)) {
         // if we found the file in the lease, remove it from pendingCreates
-        fsnamesystem.internalReleaseCreate(src, holder);
+        fsnamesystem.internalReleaseCreate(src, lease);
       } else {
         LOG.warn("Attempt by " + holder + 
                  " to release someone else's create lock on " + src);
@@ -137,7 +161,7 @@
                + holder + " for " + src);
     }
   }
-
+  
   /************************************************************
    * A Lease governs all the locks held by a single client.
    * For each client there's a corresponding lease, whose
@@ -178,14 +202,22 @@
 
     boolean hasPath() {return !paths.isEmpty();}
 
-    void releaseLocks() throws IOException {
-      String holderStr = holder.getString();
-      for(StringBytesWritable s : paths) {
-        fsnamesystem.internalReleaseCreate(s.getString(), holderStr);
+    void releasePaths() throws IOException {
+      for (Iterator<StringBytesWritable> it = paths.iterator(); it.hasNext();)
+        fsnamesystem.internalReleaseCreate(it.next().getString(), this);
+    }
+
+    private String remove(INodeFileUnderConstruction pendingFile) {
+      for(Iterator<StringBytesWritable> i = paths.iterator(); i.hasNext(); ) {
+        String src = i.next().toString();
+        if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
+          i.remove();
+          return src;
+        }
       }
-      paths.clear();
+      return null;
     }
-  
+
     /** {@inheritDoc} */
     public String toString() {
       return "[Lease.  Holder: " + holder
@@ -225,6 +257,10 @@
       return holder.hashCode();
     }
     
+    String getHolder() throws IOException {
+      return holder.getString();
+    }
+
     Collection<StringBytesWritable> getPaths() {
       return paths;
     }
@@ -324,9 +360,12 @@
               while ((sortedLeases.size() > 0) &&
                      ((top = sortedLeases.first()) != null)) {
                 if (top.expiredHardLimit()) {
+                  top.releasePaths();
+/* 
                   LOG.info("Lease Monitor: Removing lease " + top
                       + ", leases remaining: " + sortedLeases.size());
                   removeExpiredLease(top);
+*/
                 } else {
                   break;
                 }
@@ -346,4 +385,98 @@
       }
     }
   }
-}
+
+  /*
+   * The following codes provides useful static methods for lease recovery.
+   * 
+   * Lease recovery algorithm
+   * 1) Namenode retrieves lease information
+   * 2) For each file f in the lease, consider the last block b of f
+   * 2.1) Get the datanodes which contains b
+   * 2.2) Assign one of the datanodes as the primary datanode p
+
+   * 2.3) p obtains a new generation stamp form the namenode
+   * 2.4) p get the block info from each datanode
+   * 2.5) p computes the minimum block length
+   * 2.6) p updates the datanodes, which have a valid generation stamp,
+   *      with the new generation stamp and the minimum block length 
+   * 2.7) p acknowledges the namenode the update results
+
+   * 2.8) Namenode updates the BlockInfo
+   * 2.9) Namenode removes f from the lease
+   *      and removes the lease once all files have been removed
+   * 2.10) Namenode commit changes to edit log
+   */
+  private static class DatanodeRecord { 
+    final DatanodeID id;
+    final InterDatanodeProtocol datanode;
+    
+    DatanodeRecord(DatanodeID id, InterDatanodeProtocol datanode) {
+      this.id = id;
+      this.datanode = datanode;
+    }
+  }
+
+  DatanodeDescriptor choosePrimaryDatanode(String src, BlockInfo[] blocks) {
+    DatanodeDescriptor primary = null;
+    if (blocks != null && blocks.length > 0) {
+      //assign the first alive datanode as the primary datanode
+      BlockInfo last = blocks[blocks.length - 1];
+      primary = last.choosePrimaryDatanode(primary);
+    }
+    return primary;
+    
+  }
+
+  /** Used by DataNode */ 
+  static void recoverBlock(Block block, DatanodeID[] datanodeids,
+      DatanodeProtocol namenode, Configuration conf) throws IOException {
+    List<DatanodeRecord> syncList = new ArrayList<DatanodeRecord>();
+    long minlength = Long.MAX_VALUE;
+
+    //check generation stamps
+    for(DatanodeID id : datanodeids) {
+      try {
+        InterDatanodeProtocol datanode
+            = DataNode.createInterDataNodeProtocolProxy(id, conf);
+        BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+        if (info.getGenerationStamp() >= block.generationStamp) {
+          syncList.add(new DatanodeRecord(id, datanode));
+          if (info.len < minlength) {
+            minlength = info.len;
+          }
+        }
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn(
+            "Failed to getBlockMetaDataInfo for block (=" + block 
+            + ") from datanode (=" + id + ")", e);
+      }
+    }
+    
+    syncBlock(block, minlength, syncList, namenode);
+  }
+
+  /** block synchronization */
+  static void syncBlock(Block block, long minlength,
+      List<DatanodeRecord> syncList, DatanodeProtocol namenode
+      ) throws IOException {
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+    long generationstamp = namenode.nextGenerationStamp();
+    Block newblock = new Block(block.blkid, minlength, generationstamp);
+
+    for(DatanodeRecord r : syncList) {
+      try {
+        r.datanode.updateBlock(newblock);
+        successList.add(r.id);
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+            + newblock + ", datanode=" + r.id + ")", e);
+      }
+    }
+
+    namenode.commitBlockSynchronization(block,
+        newblock.generationStamp, newblock.len,
+        successList.toArray(new DatanodeID[successList.size()]));
+  }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/BlockCommand.java
===================================================================
--- src/java/org/apache/hadoop/dfs/BlockCommand.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/BlockCommand.java	(working copy)
@@ -18,6 +18,9 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import java.util.List;
+
+import org.apache.hadoop.dfs.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.io.*;
 
 abstract class DatanodeCommand implements Writable {
@@ -101,12 +104,17 @@
   /**
    * Create BlockCommand for transferring blocks to another datanode
    * @param blocks    blocks to be transferred 
-   * @param targets   nodes to transfer
    */
-  BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-    super( DatanodeProtocol.DNA_TRANSFER);
-    this.blocks = blocks;
-    this.targets = targets;
+  BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
+    super(action);
+
+    blocks = new Block[blocktargetlist.size()]; 
+    targets = new DatanodeInfo[blocks.length][];
+    for(int i = 0; i < blocks.length; i++) {
+      BlockTargetPair p = blocktargetlist.get(i);
+      blocks[i] = p.block;
+      targets[i] = p.targets;
+    }
   }
 
   private static final DatanodeInfo[][] EMPTY_TARGET = {};
Index: src/java/org/apache/hadoop/dfs/INode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/INode.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/INode.java	(working copy)
@@ -778,16 +778,13 @@
 }
 
 class INodeFileUnderConstruction extends INodeFile {
-  protected StringBytesWritable clientName;         // lease holder
-  protected StringBytesWritable clientMachine;
-  protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+  StringBytesWritable clientName = null;         // lease holder
+  StringBytesWritable clientMachine = null;
+  DatanodeDescriptor clientNode = null; // if client is a cluster node too.
+  DatanodeDescriptor primaryNode = null; // the node working on lease recovery
+  
+  INodeFileUnderConstruction() {}
 
-  INodeFileUnderConstruction() {
-    clientName = null;
-    clientMachine = null;
-    clientNode = null;
-  }
-
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
                              long preferredBlockSize,
Index: src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java	(working copy)
@@ -38,9 +38,8 @@
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
-   * Update the GenerationStamp of a block
+   * Update the block to the new generation stamp and length.  
    * @return true iff update was required and done successfully 
    */
-  boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
-      ) throws IOException;
+  boolean updateBlock(Block block) throws IOException;
 }
\ No newline at end of file
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -80,7 +80,8 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
+public class DataNode extends Configured 
+    implements InterDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
@@ -178,7 +179,7 @@
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
-      
+    super(conf);
     datanodeObject = this;
 
     try {
@@ -772,20 +773,19 @@
   private boolean processCommand(DatanodeCommand cmd) throws IOException {
     if (cmd == null)
       return true;
+    BlockCommand bmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
-      //
       // Send a copy of a block to another datanode
-      //
-      BlockCommand bcmd = (BlockCommand)cmd;
-      transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      transferBlocks(bmd.getBlocks(), bmd.getTargets());
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
       //
       // Some local block(s) are obsolete and can be 
       // safely garbage-collected.
       //
-      Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+      Block toDelete[] = bmd.getBlocks();
       try {
         if (blockScanner != null) {
           blockScanner.deleteBlocks(toDelete);
@@ -821,6 +821,13 @@
         scheduleBlockReport(initialBlockReportDelay);
       }
       break;
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+      for(int i = 0; i < bmd.getBlocks().length; i++) {
+        //recover each block
+        LeaseManager.recoverBlock(bmd.getBlocks()[i], bmd.getTargets()[i],
+            namenode, getConf());
+      }
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -3012,9 +3019,9 @@
   }
 
   /** {@inheritDoc} */
-  public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
+  public boolean updateBlock(Block block) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+      LOG.debug("block=" + block);
     }
     //TODO: update generation stamp here
     return false;
Index: src/java/org/apache/hadoop/dfs/GenerationStamp.java
===================================================================
--- src/java/org/apache/hadoop/dfs/GenerationStamp.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/GenerationStamp.java	(working copy)
@@ -64,9 +64,9 @@
   }
 
   /**
-   * Returns the current stamp and incrments the counter
+   * First increments the counter and then returns the stamp 
    */
-  public long nextStamp() {
+  public synchronized long nextStamp() {
     this.genstamp++;
     return this.genstamp;
   }
Index: src/java/org/apache/hadoop/dfs/FSConstants.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSConstants.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/FSConstants.java	(working copy)
@@ -131,7 +131,9 @@
   // Currently we set the maximum length to 8k characters and the maximum depth to 1k.  
   public static int MAX_PATH_LENGTH = 8000;
   public static int MAX_PATH_DEPTH = 1000;
-    
+
+  public static long FIRST_VALID_GENERATION_STAMP = 1000L;
+  
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
   //Used for writing header etc.
   static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
Index: src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(working copy)
@@ -40,28 +40,58 @@
 
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
+  static class BlockTargetPair {
+    Block block;
+    DatanodeDescriptor[] targets;    
 
+    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+      this.block = block;
+      this.targets = targets;
+    }
+  }
+
+  private static class BlockQueue {
+    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+
+    synchronized int size() {return blockq.size();}
+
+    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
+      return blockq.offer(new BlockTargetPair(block, targets));
+    }
+
+    synchronized List<BlockTargetPair> poll(int numTargets) {
+      if (numTargets <= 0 || blockq.isEmpty()) {
+        return null;
+      }
+      else {
+        List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+        for(; !blockq.isEmpty() && numTargets > 0; ) {
+          numTargets -= blockq.peek().targets.length; 
+          if (numTargets >= 0) {
+            results.add(blockq.poll());
+          }
+        }
+        return results;
+      }
+    }
+  }
+
   private volatile BlockInfo blockList = null;
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
 
-  //
-  // List of blocks to be replicated by this datanode
-  // Also, a list of datanodes per block to indicate the target
-  // datanode of this replication.
-  //
-  List<Block> replicateBlocks;
-  List<DatanodeDescriptor[]> replicateTargetSets;
-  Set<Block> invalidateBlocks;
+  /** A queue of blocks to be replicated by this datanode */
+  private BlockQueue replicateBlocks = new BlockQueue();
+  /** A queue of blocks to be recovered by this datanode */
+  private BlockQueue recoverBlocks = new BlockQueue();
+  /** A set of blocks to be invalidated by this datanode */
+  private Set<Block> invalidateBlocks = new TreeSet<Block>();
+
   boolean processedBlockReport = false;
-
   
   /** Default constructor */
-  public DatanodeDescriptor() {
-    super();
-    initWorkLists();
-  }
+  public DatanodeDescriptor() {}
   
   /** DatanodeDescriptor constructor
    * @param nodeID id of the data node
@@ -107,7 +137,6 @@
                             int xceiverCount) {
     super(nodeID);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
   /** DatanodeDescriptor constructor
@@ -128,18 +157,8 @@
                             int xceiverCount) {
     super(nodeID, networkLocation, hostName);
     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-    initWorkLists();
   }
 
-  /*
-   * initialize list of blocks that store work for the datanodes
-   */
-  private void initWorkLists() {
-    replicateBlocks = new ArrayList<Block>();
-    replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
-    invalidateBlocks = new TreeSet<Block>();
-  }
-
   /**
    * Add data-node to the block.
    * Add block to the head of the list of blocks belonging to the data-node.
@@ -227,13 +246,18 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    synchronized (replicateBlocks) {
-      replicateBlocks.add(block);
-      replicateTargetSets.add(targets);
-    }
+    replicateBlocks.offer(block, targets);
   }
 
   /**
+   * Store block recovery work.
+   */
+  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
+    assert(block != null && targets != null && targets.length > 0);
+    recoverBlocks.offer(block, targets);
+  }
+
+  /**
    * Store block invalidation work.
    */
   void addBlocksToBeInvalidated(List<Block> blocklist) {
@@ -245,16 +269,14 @@
     }
   }
 
-  /*
+  /**
    * The number of work items that are pending to be replicated
    */
   int getNumberOfBlocksToBeReplicated() {
-    synchronized (replicateBlocks) {
-      return replicateBlocks.size();
-    }
+    return replicateBlocks.size();
   }
 
-  /*
+  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -279,36 +301,14 @@
     return processedBlockReport;
   }
 
-  /**
-   * Remove the specified number of target sets
-   */
-  BlockCommand getReplicationCommand(int maxNumTransfers) {
-    synchronized (replicateBlocks) {
-      assert(replicateBlocks.size() == replicateTargetSets.size());
+  BlockCommand getReplicationCommand(int maxTransfers) {
+    return new BlockCommand(DatanodeProtocol.DNA_TRANSFER,
+        replicateBlocks.poll(maxTransfers));
+  }
 
-      if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
-        return null;
-      }
-      int numTransfers = 0;
-      int numBlocks = 0;
-      int i;
-      for (i = 0; i < replicateTargetSets.size() && 
-             numTransfers < maxNumTransfers; i++) {
-        numTransfers += replicateTargetSets.get(i).length;
-      }
-      numBlocks = i;
-      Block[] blocklist = new Block[numBlocks];
-      DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][];
-
-      for (i = 0; i < numBlocks; i++) {
-        blocklist[i] = replicateBlocks.get(0);
-        targets[i] = replicateTargetSets.get(0);
-        replicateBlocks.remove(0);
-        replicateTargetSets.remove(0);
-      }
-      assert(blocklist.length > 0 && targets.length > 0);
-      return new BlockCommand(blocklist, targets);
-    }
+  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
+    return new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK,
+        recoverBlocks.poll(maxTransfers));
   }
 
   /**
Index: src/java/org/apache/hadoop/dfs/FSDirectory.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDirectory.java	(revision 656333)
+++ src/java/org/apache/hadoop/dfs/FSDirectory.java	(working copy)
@@ -166,7 +166,6 @@
     }
     // add create file record to log, record new generation stamp
     fsImage.getEditLog().logOpenFile(path, newNode);
-    fsImage.getEditLog().logGenerationStamp(generationStamp);
 
     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                   +path+" is added to the file system");
@@ -287,13 +286,14 @@
    */
   void closeFile(String path, INodeFile file) throws IOException {
     waitForReady();
-
     synchronized (rootDir) {
       // file is closed
       fsImage.getEditLog().logCloseFile(path, file);
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
                                     +path+" with "+ file.getBlocks().length 
                                     +" blocks is persisted to the file system");
+      }
     }
   }
 
