Index: src/java/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DFSClient.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/DFSClient.java	(working copy)
@@ -478,6 +478,17 @@
     }
     return result;
   }
+
+  public OutputStream append(String src, int buffersize, Progressable progress
+      ) throws IOException {
+    checkOpen();
+    OutputStream result = new DFSOutputStream(src, buffersize, progress);
+    synchronized(pendingCreates) {
+      pendingCreates.put(src, result);
+    }
+    return result;
+  }
+  
   /**
    * Set replication for an existing file.
    * 
@@ -1742,14 +1753,14 @@
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private Block block;
-    private long blockSize;
+    final private long blockSize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
     private int maxPackets = 80; // each packet 64K, total 5MB
     // private int maxPackets = 1000; // each packet 64K, total 64MB
-    private DataStreamer streamer;
+    private DataStreamer streamer = new DataStreamer();;
     private ResponseProcessor response = null;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
@@ -1866,7 +1877,7 @@
     // it. When all the packets for a block are sent out and acks for each
     // if them are received, the DataStreamer closes the current block.
     //
-    private class DataStreamer extends Thread {
+    private class DataStreamer extends Daemon {
 
       private volatile boolean closed = false;
   
@@ -2253,11 +2264,12 @@
     }
 
     private Progressable progress;
+
     /**
      * Create a new output stream to the given DataNode.
      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
      */
-    public DFSOutputStream(String src, FsPermission masked,
+    DFSOutputStream(String src, FsPermission masked,
                            boolean overwrite,
                            short replication, long blockSize,
                            Progressable progress,
@@ -2294,12 +2306,49 @@
         throw re.unwrapRemoteException(AccessControlException.class,
                                        QuotaExceededException.class);
       }
-      streamer = new DataStreamer();
-      streamer.setDaemon(true);
+
       streamer.start();
     }
   
     /**
+     * Create a new output stream to the given DataNode.
+     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     */
+    DFSOutputStream(String src, int buffersize, Progressable progress
+        ) throws IOException {
+      super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
+      this.src = src;
+      this.progress = progress;
+      if (progress != null) {
+        LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
+      }
+      
+      try {
+        this.blockSize = namenode.append(src, clientName);
+      } catch(RemoteException re) {
+        throw re.unwrapRemoteException(AccessControlException.class,
+                                       QuotaExceededException.class);
+      }
+
+      int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); 
+      if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
+        throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
+                              ") and blockSize(" + blockSize + 
+                              ") do not match. " + "blockSize should be a " +
+                              "multiple of io.bytes.per.checksum");
+                              
+      }
+      checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+                                              bytesPerChecksum);
+      int chunkSize = bytesPerChecksum + checksum.getChecksumSize();
+      chunksPerPacket = Math.max((writePacketSize - DataNode.PKT_HEADER_LEN - 
+                                  SIZE_OF_INTEGER + chunkSize-1)/chunkSize, 1);
+      packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER + 
+                   chunkSize * chunksPerPacket; 
+
+      streamer.start();
+    }
+    /**
      * Open a DataOutputStream to a DataNode so that it can be written to.
      * This happens when a file is created and each time a new block is allocated.
      * Must get block ID and the IDs of the destinations from the namenode.
Index: src/java/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNode.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/NameNode.java	(working copy)
@@ -275,8 +275,10 @@
                              long blockSize
                              ) throws IOException {
     String clientMachine = getClientMachine();
-    stateChangeLog.debug("*DIR* NameNode.create: file "
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.create: file "
                          +src+" for "+clientName+" at "+clientMachine);
+    }
     if (!checkPathLength(src)) {
       throw new IOException("create: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
@@ -288,17 +290,16 @@
     myMetrics.numFilesCreated.inc();
   }
 
-  /** Coming in a future release.... */
-  void append(String src, String clientName) throws IOException {
+  /** {@inheritDoc} */
+  public long append(String src, String clientName) throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.append: file "
           +src+" for "+clientName+" at "+clientMachine);
     }
-    //TODO: add namesystem.appendFile(...), which calls appendFileInternal(...)
-    namesystem.appendFileInternal(src, clientName, clientMachine);
-
-    //TODO: inc myMetrics;
+    long blocksize = namesystem.appendFile(src, clientName, clientMachine);
+    myMetrics.numFilesAppended.inc();
+    return blocksize;
   }
 
   /** {@inheritDoc} */
Index: src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNodeMetrics.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/NameNodeMetrics.java	(working copy)
@@ -46,6 +46,7 @@
     private NameNodeStatistics namenodeStats;
     
     public MetricsTimeVaryingInt numFilesCreated = new MetricsTimeVaryingInt("FilesCreated");
+    public MetricsTimeVaryingInt numFilesAppended = new MetricsTimeVaryingInt("FilesAppended");
     public MetricsTimeVaryingInt numGetBlockLocations = new MetricsTimeVaryingInt("GetBlockLocations");
     public MetricsTimeVaryingInt numFilesRenamed = new MetricsTimeVaryingInt("FilesRenamed");
     public MetricsTimeVaryingInt numFilesListed = new MetricsTimeVaryingInt("FilesListed");
@@ -91,6 +92,7 @@
     public void doUpdates(MetricsContext unused) {
       synchronized (this) {
         numFilesCreated.pushMetric(metricsRecord);
+        numFilesAppended.pushMetric(metricsRecord);
         numGetBlockLocations.pushMetric(metricsRecord);
         numFilesRenamed.pushMetric(metricsRecord);
         numFilesListed.pushMetric(metricsRecord);
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -42,6 +42,7 @@
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.DataOutputStream;
@@ -1010,9 +1011,27 @@
     }
   }
 
+  /**
+   * Append to an existing file in the namespace.
+   */
+  long appendFile(String src, String holder, String clientMachine
+      ) throws IOException {
+    long blocksize = appendFileInternal(src, holder, clientMachine);
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled()) {
+      final FileStatus stat = dir.getFileInfo(src);
+      auditLog.info(String.format(AUDIT_FORMAT,
+                    UserGroupInformation.getCurrentUGI(),
+                    Server.getRemoteIp(),
+                    "append", src, stat.getOwner() + ':' +
+                    stat.getGroup() + ':' + stat.getPermission()));
+    }
+    return blocksize;
+  }
+
   /** append is not yet ready.  This method is for testing. */
-  void appendFileInternal(String src, String holder, String clientMachine
-      ) throws IOException {
+  private synchronized long appendFileInternal(String src, String holder,
+      String clientMachine) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
           +src+" for "+holder+" at "+clientMachine);
@@ -1026,19 +1045,36 @@
       checkPathAccess(src, FsAction.WRITE);
     }
 
+    INodeFile f = dir.getFileINode(src);
+    if (f == null) {
+      throw new FileNotFoundException("File " + src + " not found.");
+    }
+    if (f.isUnderConstruction()) {
+      throw new IOException("File " + src + " is already under construction.");
+    }
+
+    DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
+        clientMachine);
     try {
-      INodeFile f = dir.getFileINode(src);
-      //assume f != null && !f.isUnderConstruction() && lease does not exist
-      //TODO: remove the assumption 
+      //check whether a lease does not exist
+      Lease lease = this.leaseManager.getLeaseByPath(src);
+      if (lease != null) {
+        if (!lease.expiredSoftLimit()) {
+          throw new IOException("Lease for file " + src
+              + " exists.  The file is already opened for writing.");
+        }
+        else {
+          //TODO: trigger lease recovery
+        }
+      }
 
-      DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
-          clientMachine);
       INodeFileUnderConstruction newnode = f.toINodeFileUnderConstruction(
           holder, clientMachine, clientNode);
 
       dir.replaceNode(src, f, newnode);
       leaseManager.addLease(newnode.clientName, src);
 
+      //TODO: initialize write to the end of a file
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.appendFile: ", ie);
       throw ie;
@@ -1048,6 +1084,7 @@
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: "
           +"add "+src+" to namespace for "+holder);
     }
+    return f.getPreferredBlockSize();
   }
 
   /**
Index: src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DistributedFileSystem.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/DistributedFileSystem.java	(working copy)
@@ -134,7 +134,9 @@
   /** This optional operation is not yet supported. */
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
-    throw new IOException("Not supported");
+
+    return new FSDataOutputStream(
+        dfs.append(getPathName(f), bufferSize, progress), statistics);
   }
 
   public FSDataOutputStream create(Path f, FsPermission permission,
Index: src/java/org/apache/hadoop/dfs/ClientProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/ClientProtocol.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/ClientProtocol.java	(working copy)
@@ -37,10 +37,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 35 : Quota-related RPCs are introduced: getQuota, clearQuota;
-   * Besides, getContentSummary also returns the quota of the directory.
+   * 36 : Added append(...).
    */
-  public static final long versionID = 35L;
+  public static final long versionID = 36L;
   
   ///////////////////////////////////////
   // File contents
@@ -107,6 +106,18 @@
                              ) throws IOException;
 
   /**
+   * Append to the end of the file. 
+   * @param src path of the file being created.
+   * @param clientName name of the current client.
+   * @return the block size of the file
+   * @throws AccessControlException if permission to append file is 
+   * denied by the system. As usually on the client side the exception will 
+   * be wrapped into {@link org.apache.hadoop.ipc.RemoteException}.
+   * @throws IOException if other errors occur.
+   */
+  public long append(String src, String clientName) throws IOException;
+
+  /**
    * Set replication for an existing file.
    * <p>
    * The NameNode sets replication to the new value and returns.
Index: src/java/org/apache/hadoop/dfs/namenode/metrics/NameNodeStatistics.java
===================================================================
--- src/java/org/apache/hadoop/dfs/namenode/metrics/NameNodeStatistics.java	(revision 664114)
+++ src/java/org/apache/hadoop/dfs/namenode/metrics/NameNodeStatistics.java	(working copy)
@@ -166,6 +166,13 @@
   /**
    * @inheritDoc
    */
+  public int getNumFilesAppended() {
+    return myMetrics.numFilesAppended.getPreviousIntervalValue();
+  }
+
+  /**
+   * @inheritDoc
+   */
   public int getNumFilesListed() {
     return myMetrics.numFilesListed.getPreviousIntervalValue();
   }
