diff -Naur java-old/org/apache/hadoop/dfs/AlreadyBeingAppendedException.java java-new/org/apache/hadoop/dfs/AlreadyBeingAppendedException.java
--- java-old/org/apache/hadoop/dfs/AlreadyBeingAppendedException.java	1970-01-01 08:00:00.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/AlreadyBeingAppendedException.java	2007-12-20 19:51:25.000000000 +0800
@@ -0,0 +1,10 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+public class AlreadyBeingAppendedException extends IOException {
+	public AlreadyBeingAppendedException(String msg) {
+		super(msg);
+	}
+
+}
diff -Naur java-old/org/apache/hadoop/dfs/BasicAppendInfo.java java-new/org/apache/hadoop/dfs/BasicAppendInfo.java
--- java-old/org/apache/hadoop/dfs/BasicAppendInfo.java	1970-01-01 08:00:00.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/BasicAppendInfo.java	2007-12-20 19:46:55.000000000 +0800
@@ -0,0 +1,47 @@
+package org.apache.hadoop.dfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BasicAppendInfo implements Writable{
+	
+	
+	private long blockSize;
+	private short replication;
+
+	public long getBlockSize() {
+		return blockSize;
+	}
+
+	public void setBlockSize(long blockSize) {
+		this.blockSize = blockSize;
+	}
+
+	public short getReplication() {
+		return replication;
+	}
+
+	public void setReplication(short replication) {
+		this.replication = replication;
+	}
+
+	///////////////////////////////////////////
+	// Writable
+	///////////////////////////////////////////
+	public void write(DataOutput out) throws IOException {
+
+		out.writeShort(replication);
+		out.writeLong(blockSize);
+
+	}
+
+	public void readFields(DataInput in) throws IOException {
+
+		replication = in.readShort();
+		blockSize = in.readLong();
+	}
+
+}
diff -Naur java-old/org/apache/hadoop/dfs/ClientProtocol.java java-new/org/apache/hadoop/dfs/ClientProtocol.java
--- java-old/org/apache/hadoop/dfs/ClientProtocol.java	2007-11-20 08:30:20.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/ClientProtocol.java	2007-12-20 19:55:42.000000000 +0800
@@ -18,6 +18,7 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
 
@@ -100,7 +101,20 @@
                              short replication,
                              long blockSize
                              ) throws IOException;
+  /**
+   * Create a new file. Just add the file into filesystem, 
+   * don't create new block for the file. If want to append 
+   * data into the file, please open() the file.
+   */ 
+  public boolean createFile(String src, 
+      String clientName, 
+      boolean overwrite, 
+      short replication,
+      long blockSize
+      ) throws IOException;
 
+  public BasicAppendInfo basicAppendFile(String src, String clientName, boolean isCreated) throws IOException;
+  
   /**
    * Set replication for an existing file.
    * 
diff -Naur java-old/org/apache/hadoop/dfs/DFSClient.java java-new/org/apache/hadoop/dfs/DFSClient.java
--- java-old/org/apache/hadoop/dfs/DFSClient.java	2007-11-20 08:30:23.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/DFSClient.java	2007-12-20 20:18:02.000000000 +0800
@@ -284,6 +284,37 @@
     //    Get block info from namenode
     return new DFSInputStream(src, buffersize);
   }
+  
+  /**
+   */
+  public Object open(String src, int buffersize, String flags) throws IOException {
+    checkOpen();
+    
+    if (flags.equals("r")){
+    	return new DFSClient.DFSDataInputStream(new DFSInputStream(src, buffersize));
+    }
+    
+    if (flags.equals("a")){
+    	
+    	FSDataOutputStream result = new FSDataOutputStream(new DFSOutputStream(src, false, buffersize));
+    	
+    	synchronized (pendingCreates) {
+    	    pendingCreates.put(src.toString(), result);
+    	}
+    	return result;
+    }
+    
+    if (flags.equals("a+c")){
+    	FSDataOutputStream result = new FSDataOutputStream(new DFSOutputStream(src, true, buffersize));
+    	
+    	synchronized (pendingCreates) {
+    	    pendingCreates.put(src.toString(), result);
+    	}
+    	return result;
+    }
+    
+    return new IOException("unsupported open flags");
+  }
 
   /**
    * Create a new dfs file and return an output stream for writing into it. 
@@ -380,6 +411,23 @@
     }
     return result;
   }
+  
+  /**
+   * Just create file, don't open the file
+   * 
+   * @param src
+   * @param overwrite
+   * @return
+   * @throws IOException
+   */
+  public boolean createFile(String src, boolean overwrite) throws IOException {
+	  return createFile(src, overwrite, defaultReplication, defaultBlockSize);
+  }
+  public boolean createFile(String src, boolean overwrite, short replication,long blockSize) throws IOException {
+      return namenode.createFile(src, clientName, overwrite, replication, blockSize);
+  }
+  
+  
   /**
    * Set replication for an existing file.
    * 
@@ -1446,6 +1494,36 @@
           src.toString(), clientName, overwrite, replication, blockSize);
     }
 
+    /**
+     */
+    public DFSOutputStream(String src, boolean create, int buffersize) throws IOException {
+    	
+    	super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
+    	this.src = src;
+    	this.buffersize = buffersize;
+    	
+    	BasicAppendInfo appendInfo = namenode.basicAppendFile(src, clientName, create);
+        this.replication = appendInfo.getReplication();
+        this.blockSize = appendInfo.getBlockSize();
+        this.progress = null;
+        
+        if (progress != null) {
+          LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
+        }
+        
+        int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); 
+        if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
+          throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
+                                ") and blockSize(" + blockSize + 
+                                ") do not match. " + "blockSize should be a " +
+                                "multiple of io.bytes.per.checksum");
+                                
+        }
+        
+        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+                                                bytesPerChecksum);  	
+    }
+    
     private void openBackupStream() throws IOException {
       File tmpFile = newBackupFile();
       backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile),
diff -Naur java-old/org/apache/hadoop/dfs/DistributedFileSystem.java java-new/org/apache/hadoop/dfs/DistributedFileSystem.java
--- java-old/org/apache/hadoop/dfs/DistributedFileSystem.java	2007-11-20 08:30:18.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/DistributedFileSystem.java	2007-12-20 20:15:19.000000000 +0800
@@ -122,6 +122,15 @@
                                               replication, blockSize, 
                                               progress, bufferSize) );
   }
+  
+  public Object open(String src, int buffersize, String flags) throws IOException {
+  	return dfs.open(src, buffersize, flags);	  
+  } 
+  
+  public boolean createFile(String src, boolean overwrite) throws IOException {
+	  return dfs.createFile(src, overwrite);
+  }
+
 
   public boolean setReplication(Path src, 
                                 short replication
diff -Naur java-old/org/apache/hadoop/dfs/FSDirectory.java java-new/org/apache/hadoop/dfs/FSDirectory.java
--- java-old/org/apache/hadoop/dfs/FSDirectory.java	2007-11-20 08:30:25.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/FSDirectory.java	2007-12-20 16:20:19.000000000 +0800
@@ -111,6 +111,39 @@
       }
     }
   }
+  
+  INode addFile(String path, short replication, long preferredBlockSize) throws IOException {
+ 	 
+    waitForReady();
+
+    // Always do an implicit mkdirs for parent directory tree.
+    long modTime = FSNamesystem.now();
+    if (!mkdirs(new Path(path).getParent().toString(), modTime)) {
+      return null;
+    }
+    
+    INodeFile newNode = new INodeFile(0, replication,  modTime, preferredBlockSize);
+    
+    synchronized (rootDir) {
+      try {
+        newNode = rootDir.addNode(path, newNode);
+      } catch (FileNotFoundException e) {
+        newNode = null;
+      }
+    }
+    if (newNode == null) {
+      NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
+                                   +"failed to add "+path
+                                   +" to the file system");
+      return null;
+    }
+    // add create file record to log
+    fsImage.getEditLog().logCreateFile(path, newNode);
+    NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+                                  +path+" is added to the file system");
+    return newNode;
+ 
+}
 
   /**
    * Add the given filename to the fs.
diff -Naur java-old/org/apache/hadoop/dfs/FSEditLog.java java-new/org/apache/hadoop/dfs/FSEditLog.java
--- java-old/org/apache/hadoop/dfs/FSEditLog.java	2007-11-20 08:30:18.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/FSEditLog.java	2007-12-20 10:55:26.000000000 +0800
@@ -48,6 +48,7 @@
   //the following two are used only for backword compatibility :
   @Deprecated private static final byte OP_DATANODE_ADD = 5;
   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
+  private static final byte OP_SAVE_NEXTBLOCKID = 7;
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -571,6 +572,12 @@
             //Datanodes are not persistent any more.
             break;
           }
+          case OP_SAVE_NEXTBLOCKID: {
+          	  UTF8 blockid = new UTF8();
+          	  blockid.readFields(in);
+          	  fsNamesys.setNextBlockid(Long.parseLong(blockid.toString()));       	  
+          	  break;
+          }
           default: {
             throw new IOException("Never seen opcode " + opcode);
           }
@@ -787,6 +794,13 @@
             FSEditLog.toLogReplication(replication));
   }
   
+  /**
+   * Add save next block id to edit log
+   */
+  void logSaveNextBlockid(Long nextBlockid){
+    logEdit(OP_SAVE_NEXTBLOCKID, new UTF8(nextBlockid.toString()), null);	 
+  }
+  
   /** 
    * Add delete file record to edit log
    */
diff -Naur java-old/org/apache/hadoop/dfs/FSImage.java java-new/org/apache/hadoop/dfs/FSImage.java
--- java-old/org/apache/hadoop/dfs/FSImage.java	2007-11-20 08:30:24.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/FSImage.java	2007-12-20 11:06:20.000000000 +0800
@@ -714,6 +714,8 @@
       
       // load datanode info
       this.loadDatanodes(imgVersion, in);
+      // load global variables
+      this.loadGlobalVariables(imgVersion, in);
     } finally {
       in.close();
     }
@@ -755,6 +757,7 @@
       out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
       saveImage("", fsDir.rootDir, out);
       saveDatanodes(out);
+      saveGlobalVariables(out);
     } finally {
       out.close();
     }
@@ -879,6 +882,24 @@
       // We don't need to add these descriptors any more.
     }
   }
+  
+  /**
+   * Save nextBlockid and globalGenerationStamp
+   */  
+  void saveGlobalVariables(DataOutputStream out) throws IOException {	  
+	  FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();	  
+	  // nextBlockid
+	  out.writeLong(fsNamesys.getNextBlockid(false));	  
+  }
+  
+  /**
+   * Load nextBlockid and globalGenerationStamp
+   */
+  void loadGlobalVariables(int version, DataInputStream in) throws IOException {
+	  FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+	  // nextBlockid
+	  fsNamesys.setNextBlockid(in.readLong());  
+  }
 
   /**
    * Moves fsimage.ckpt to fsImage and edits.new to edits
diff -Naur java-old/org/apache/hadoop/dfs/FSNamesystem.java java-new/org/apache/hadoop/dfs/FSNamesystem.java
--- java-old/org/apache/hadoop/dfs/FSNamesystem.java	2007-11-20 08:30:17.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/FSNamesystem.java	2007-12-20 19:44:17.000000000 +0800
@@ -50,6 +50,11 @@
  ***************************************************/
 class FSNamesystem implements FSConstants {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
+  
+  //
+  // Stores the next available block id
+  //
+  private Long nextBlockid = new Long(1);
 
   //
   // Stores the correct file name hierarchy
@@ -322,8 +327,34 @@
    */
   public static FSNamesystem getFSNamesystem() {
     return fsNamesystemObject;
-  } 
-
+  }
+  
+  /** Set the next available block ID
+   * 
+   */
+  public void setNextBlockid(long blockid){
+	  this.nextBlockid = Long.valueOf(blockid);
+  }
+  
+  /** Get the next available block ID.
+   *  If get and increase the nextBlockid, set isIncrease as true;
+   *  If get and don't increase the nextBlockid, set isIncrease
+   *  as false.
+   * 
+   */
+  public long getNextBlockid(boolean isIncrease){
+	  long ret;	  
+	  synchronized(nextBlockid){
+		  ret = nextBlockid.longValue();
+		  if(isIncrease){
+			  nextBlockid = Long.valueOf(ret + 1);
+			  dir.fsImage.getEditLog().logSaveNextBlockid(nextBlockid);
+			  dir.fsImage.getEditLog().logSync();
+		  }
+	  }
+	  return ret;	  
+  }
+  
   NamespaceInfo getNamespaceInfo() {
     return new NamespaceInfo(dir.fsImage.getNamespaceID(),
                              dir.fsImage.getCTime(),
@@ -799,6 +830,155 @@
       throw new IOException( 
                             text + " is less than the required minimum " + minReplication);
   }
+  
+  /**
+   * To do basic append file. Create lease and return the BasicAppendInfo
+   */
+  public BasicAppendInfo basicAppendFile(String src, String holder, String clientMachine) throws IOException {
+		
+	  NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file " + src
+				+ " for " + holder + " at " + clientMachine);
+		if (isInSafeMode())
+			throw new SafeModeException("Cannot append file" + src, safeMode);
+
+		if (!isValidName(src)) {
+			throw new IOException("Invalid file name: " + src);
+		}
+
+		INode myFile = dir.getFileINode(src);
+
+		if (myFile == null)
+			throw new IOException("can't find the appended file");
+
+		if (myFile.isUnderConstruction()) {
+
+			INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
+			//
+			// If the file is under construction , then it must be in our
+			// leases. Find the appropriate lease record.
+			//
+			Lease lease = getLease(holder);
+			//
+			// We found the lease for this file. And surprisingly the
+			// original
+			// holder is trying to recreate this file. This should never
+			// occur.
+			//
+			if (lease != null) {
+				throw new AlreadyBeingAppendedException(
+						"failed to append file "
+								+ src
+								+ " for "
+								+ holder
+								+ " on client "
+								+ clientMachine
+								+ " because current leaseholder is trying to reappend file.");
+			}
+			//
+			// Find the original holder.
+			//
+			lease = getLease(pendingFile.getClientName());
+			if (lease == null) {
+				throw new AlreadyBeingCreatedException(
+						"failed to create file "
+								+ src
+								+ " for "
+								+ holder
+								+ " on client "
+								+ clientMachine
+								+ " because pendingCreates is non-null but no leases found.");
+			}
+			//
+			// If the original holder has not renewed in the last SOFTLIMIT
+			// period, then reclaim all resources and allow this request
+			// to proceed. Otherwise, prevent this request from creating
+			// file.
+			//
+			if (lease.expiredSoftLimit()) {
+				synchronized (sortedLeases) {
+					lease.releaseLocks();
+					removeLease(lease.getHolder());
+					LOG.info("startFile: Removing lease " + lease + " ");
+					if (!sortedLeases.remove(lease)) {
+						LOG
+								.error("startFile: Unknown failure trying to remove "
+										+ lease + " from lease set.");
+					}
+				}
+			} else {
+				throw new AlreadyBeingCreatedException("failed to append file "
+						+ src + " for " + holder + " on client "
+						+ clientMachine
+						+ ", because this file is already being appended by "
+						+ pendingFile.getClientName() + " on "
+						+ pendingFile.getClientMachine());
+			}
+		}
+
+		INodeFile file = (INodeFile) myFile;
+		DatanodeDescriptor clientNode = host2DataNodeMap
+				.getDatanodeByHost(clientMachine);
+		INodeFileUnderConstruction appendFile = file
+				.convertToINodeFileUnderConstruction(holder, clientMachine,
+						clientNode);
+		dir.replaceNode(src, file, appendFile);
+
+		synchronized (sortedLeases) {
+			Lease lease = getLease(holder);
+			if (lease == null) {
+				lease = new Lease(holder);
+				putLease(holder, lease);
+				sortedLeases.add(lease);
+			} else {
+				sortedLeases.remove(lease);
+				lease.renew();
+				sortedLeases.add(lease);
+			}
+			lease.startedCreate(src);
+		}		
+		
+		BasicAppendInfo appendInfo = new BasicAppendInfo();
+		appendInfo.setBlockSize(appendFile.getPreferredBlockSize());
+		appendInfo.setReplication(appendFile.getReplication());
+		return appendInfo;		
+}
+  
+  /**
+   * create a new file
+   */
+  void createFile(String src, String clientName, boolean overwrite, short replication,long blockSize) throws IOException {
+		
+		if (isInSafeMode())
+			throw new SafeModeException("Cannot create file" + src, safeMode);
+		if (!isValidName(src)) {
+			throw new IOException("Invalid file name: " + src);
+		}
+		
+		try {
+			verifyReplication(src, replication, clientName);
+		} catch (IOException e) {
+			throw new IOException("failed to create " + e.getMessage());
+		}
+		
+		synchronized(dir.rootDir){
+			
+			if (!dir.isValidToCreate(src)) {
+				if (overwrite && dir.getFileINode(src) != null && !dir.getFileINode(src).isUnderConstruction()) {
+					delete(src);
+				} else {
+					throw new IOException(
+							"failed to create file "
+									+ src
+									+ " on client "
+									+ clientName
+									+ " either because the filename is invalid or the file is being modified");
+				}
+			}
+			
+			dir.addFile(src, replication, blockSize);		
+		}
+
+  }
 
   void startFile(String src, String holder, String clientMachine, 
                  boolean overwrite, short replication, long blockSize
@@ -1148,9 +1328,7 @@
    */
   private Block allocateBlock(String src, INode file) throws IOException {
     Block b = null;
-    do {
-      b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
-    } while (isValidBlock(b));
+    b = new Block(this.getNextBlockid(true), 0);
     b = dir.addBlock(src, file, b);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
                                  +src+ ". "+b.getBlockName());
diff -Naur java-old/org/apache/hadoop/dfs/INode.java java-new/org/apache/hadoop/dfs/INode.java
--- java-old/org/apache/hadoop/dfs/INode.java	2007-11-20 08:30:18.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/INode.java	2007-12-20 17:30:12.000000000 +0800
@@ -602,6 +602,18 @@
     }
     return blocks[blocks.length - 2];
   }
+  
+  /**
+   * convert INodeFile to INodeFileUnderConstruction
+   */
+  INodeFileUnderConstruction convertToINodeFileUnderConstruction(String clientName, String clientMachine, DatanodeDescriptor clientNode)throws IOException{
+	  
+	  INodeFileUnderConstruction obj = new INodeFileUnderConstruction(this, clientName,
+			  														  clientMachine, clientNode);
+	  return obj;
+	  
+  }
+  
 }
 
 class INodeFileUnderConstruction extends INodeFile {
@@ -621,6 +633,17 @@
     this.clientMachine = new StringBytesWritable(clientMachine);
     this.clientNode = clientNode;
   }
+  
+  INodeFileUnderConstruction(INodeFile inodeFile, String clientName,
+			String clientMachine, DatanodeDescriptor clientNode) throws IOException {
+
+		super(inodeFile.getBlocks(), inodeFile.getReplication(), inodeFile
+				.getModificationTime(), inodeFile.getPreferredBlockSize());
+		this.clientName = new StringBytesWritable(clientName);
+		this.clientMachine = new StringBytesWritable(clientMachine);
+		this.clientNode = clientNode;
+
+	}  
 
   String getClientName() throws IOException {
     return clientName.getString();
diff -Naur java-old/org/apache/hadoop/dfs/NameNode.java java-new/org/apache/hadoop/dfs/NameNode.java
--- java-old/org/apache/hadoop/dfs/NameNode.java	2007-11-20 08:30:24.000000000 +0800
+++ java-new/org/apache/hadoop/dfs/NameNode.java	2007-12-20 19:59:44.000000000 +0800
@@ -240,6 +240,31 @@
     myMetrics.openFile();
     return result;
   }
+  
+  /**
+   */
+  public BasicAppendInfo basicAppendFile(String src, String clientName, boolean isCreated) throws IOException{
+	  
+	  String clientMachine = getClientMachine();
+	  stateChangeLog.debug("*DIR* NameNode.append: file "
+	                         +src+" for "+clientName+" at "+clientMachine);
+	  
+	  if (!checkPathLength(src)) {
+	      throw new IOException("create: Pathname too long.  Limit " 
+	                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+	  }
+	  
+	  
+	  INode myFile = namesystem.dir.getFileINode(src);
+	  
+	  if ((myFile == null) && isCreated)  {
+		  	createFile(src, clientName, false, namesystem.getDefaultReplication(), namesystem.getDefaultBlockSize());
+		    myMetrics.createFile();			
+	  }
+	  	    
+	  return namesystem.basicAppendFile(src, clientName, clientMachine);
+	  
+  }
 
   /**
    */
@@ -277,6 +302,21 @@
         src, clientName, clientMachine, overwrite, replication, blockSize);
     myMetrics.createFile();
   }
+  
+  /**
+   */
+  public boolean createFile(String src, String clientName, boolean overwrite, short replication, long blockSize) throws IOException {
+    String clientMachine = getClientMachine();
+    stateChangeLog.debug("*DIR* NameNode.create: file "
+                         +src+" for "+clientName+" at "+clientMachine);
+    if (!checkPathLength(src)) {
+      throw new IOException("create: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.createFile(src, clientName, overwrite, replication, blockSize);
+    myMetrics.createFile();
+    return true;
+  }
 
   public boolean setReplication(String src, 
                                 short replication
