Index: src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
===================================================================
--- src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java	(revision 672615)
+++ src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java	(working copy)
@@ -391,6 +391,12 @@
     return result;
   }
 
+  public synchronized BlockReadStreams getTmpInputStreams(Block b, 
+                              long blkoff, long ckoff)
+                              throws IOException {
+    return null;              // not yet implemented
+  }
+
   /**
    * Returns metaData of block b as an input stream
    * @param b - the block for which the metadata is desired
Index: src/test/org/apache/hadoop/dfs/TestFileAppend2.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestFileAppend2.java	(revision 0)
+++ src/test/org/apache/hadoop/dfs/TestFileAppend2.java	(revision 0)
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.net.*;
+import java.util.Random;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.log4j.Level;
+import org.apache.commons.logging.impl.Log4JLogger;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestFileAppend2 extends TestCase {
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 1024;
+  static final int numBlocks = 5;
+  static final int fileSize = numBlocks * blockSize + 1;
+  boolean simulatedStorage = false;
+  byte[] fileContents = null;
+
+  //
+  // create a buffer that contains the entire test file data.
+  //
+  private void initBuffer(int size) {
+    Random rand = new Random(seed);
+    fileContents = new byte[size];
+    rand.nextBytes(fileContents);
+    //for ( int i = 0; i < size; i++) {
+    //  fileContents[i] = (byte)i;
+    //}
+  }
+
+  /*
+   * creates a file but does not close it
+   */ 
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+
+  //
+  // writes to file but does not close it
+  //
+  private void writeFile(FSDataOutputStream stm) throws IOException {
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+  }
+
+  //
+  // verify that the data written to the full blocks are sane
+  // 
+  private void checkFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    boolean done = false;
+
+    // wait till all full blocks are confirmed by the datanodes.
+    while (!done) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+      done = true;
+      BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                                fileSize);
+      if (locations.length < numBlocks) {
+        System.out.println("Number of blocks found " + locations.length);
+        done = false;
+        continue;
+      }
+      for (int idx = 0; idx < numBlocks; idx++) {
+        if (locations[idx].getHosts().length < repl) {
+          System.out.println("Block index " + idx + " not yet replciated.");
+          done = false;
+          break;
+        }
+      }
+    }
+    FSDataInputStream stm = fileSys.open(name);
+    byte[] expected = new byte[numBlocks * blockSize];
+    if (simulatedStorage) {
+      for (int i= 0; i < expected.length; i++) {  
+        expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
+      }
+    } else {
+      for (int i= 0; i < expected.length; i++) {  
+        expected[i] = fileContents[i];
+      }
+    }
+    // do a sanity check. Read the file
+    byte[] actual = new byte[numBlocks * blockSize];
+    stm.readFully(0, actual);
+    checkData(actual, 0, expected, "Read 1");
+  }
+
+  private void checkFullFile(FileSystem fs, Path name) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[fileSize];
+    stm.readFully(0, actual);
+    checkData(actual, 0, fileContents, "Read 2");
+    stm.close();
+  }
+
+  private void checkData(byte[] actual, int from, byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                   expected[from+idx]+" actual "+actual[idx],
+                   expected[from+idx], actual[idx]);
+      actual[idx] = 0;
+    }
+  }
+
+
+  public void testSimpleAppend() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    initBuffer(fileSize);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+
+      // create a new file.
+      Path file1 = new Path("/simpleAppend.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+      System.out.println("Created file simpleFlush.dat");
+
+      // write to file
+      int mid = 12;   // io.bytes.per.checksum bytes
+      System.out.println("Writing " + mid + " bytes to file " + file1);
+      stm.write(fileContents, 0, mid);
+      stm.close();
+      System.out.println("Wrote and Closed first part of file.");
+
+      // write the remainder of the file
+      stm = fs.append(file1);
+      System.out.println("Writing " + (fileSize - mid) + " bytes to file " + file1);
+      stm.write(fileContents, mid, fileSize - mid);
+      System.out.println("Written second part of file");
+      stm.close();
+      System.out.println("Wrote and Closed second part of file.");
+
+      // verify that entire file is good
+      checkFullFile(fs, file1);
+
+    } catch (IOException e) {
+      System.out.println("Exception :" + e);
+      throw e; 
+    } catch (Throwable e) {
+      System.out.println("Throwable :" + e);
+      e.printStackTrace();
+      throw new IOException("Throwable : " + e);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+}
Index: src/hdfs/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/DFSClient.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/DFSClient.java	(working copy)
@@ -478,6 +478,17 @@
     }
     return result;
   }
+
+  public OutputStream append(String src, int buffersize, Progressable progress
+      ) throws IOException {
+    checkOpen();
+    OutputStream result = new DFSOutputStream(src, buffersize, progress);
+    synchronized(pendingCreates) {
+      pendingCreates.put(src, result);
+    }
+    return result;
+  }
+  
   /**
    * Set replication for an existing file.
    * 
@@ -1742,14 +1753,14 @@
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private Block block;
-    private long blockSize;
+    final private long blockSize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
     private int maxPackets = 80; // each packet 64K, total 5MB
     // private int maxPackets = 1000; // each packet 64K, total 64MB
-    private DataStreamer streamer;
+    private DataStreamer streamer = new DataStreamer();;
     private ResponseProcessor response = null;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
@@ -1764,6 +1775,8 @@
     private boolean persistBlocks = false; // persist blocks on namenode
     private int recoveryErrorCount = 0; // number of times block recovery failed
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
+    private volatile boolean appendChunk = false;   // appending to existing partial block
+    private volatile boolean appendBlock = false;   // appending to existing partial block
 
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -1772,6 +1785,7 @@
       long    offsetInBlock;       // offset in block
       boolean lastPacketInBlock;   // is this the last packet in block?
       int     numChunks;           // number of chunks currently in packet
+      int     maxChunks;           // max chunks in packet
       int     dataStart;
       int     dataPos;
       int     checksumStart;
@@ -1792,6 +1806,7 @@
         checksumPos = checksumStart;
         dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
         dataPos = dataStart;
+        maxChunks = chunksPerPkt;
       }
 
       void writeData(byte[] inarray, int off, int len) {
@@ -1866,7 +1881,7 @@
     // it. When all the packets for a block are sent out and acks for each
     // if them are received, the DataStreamer closes the current block.
     //
-    private class DataStreamer extends Thread {
+    private class DataStreamer extends Daemon {
 
       private volatile boolean closed = false;
   
@@ -1888,7 +1903,7 @@
           synchronized (dataQueue) {
 
             // process IO errors if any
-            boolean doSleep = processDatanodeError();
+            boolean doSleep = processDatanodeError(hasError);
 
             // wait for a packet to be sent.
             while ((!closed && !hasError && clientRunning 
@@ -2106,7 +2121,7 @@
     // threads and mark stream as closed. Returns true if we should
     // sleep for a while after returning from this call.
     //
-    private boolean processDatanodeError() {
+    private boolean processDatanodeError(boolean hasError) {
       if (!hasError) {
         return false;
       }
@@ -2115,12 +2130,14 @@
                  " waiting for responder to exit. ");
         return true;
       }
-      String msg = "Error Recovery for block " + block +
-                   " bad datanode[" + errorIndex + "]";
-      if (nodes != null) {
-        msg += " " + nodes[errorIndex].getName();
+      if (errorIndex >= 0) {
+        String msg = "Error Recovery for block " + block +
+                     " bad datanode[" + errorIndex + "]";
+        if (nodes != null) {
+          msg += " " + nodes[errorIndex].getName();
+        }
+        LOG.warn(msg);
       }
-      LOG.warn(msg);
 
       if (blockStream != null) {
         try {
@@ -2140,11 +2157,12 @@
 
       boolean success = false;
       while (!success && clientRunning) {
+        DatanodeInfo[] newnodes =  null;
         if (nodes == null) {
           lastException = new IOException("Could not get block locations. " +
                                           "Aborting...");
           closed = true;
-          streamer.close();
+          if (streamer != null) streamer.close();
           return false;
         }
         StringBuilder pipelineMsg = new StringBuilder();
@@ -2154,27 +2172,32 @@
             pipelineMsg.append(", ");
           }
         }
-        String pipeline = pipelineMsg.toString();
-        if (nodes.length <= 1) {
-          lastException = new IOException("All datanodes " +
-                                          pipeline + " are bad. Aborting...");
-          closed = true;
-          streamer.close();
-          return false;
-        }
-        LOG.warn("Error Recovery for block " + block +
-                 " in pipeline " + pipeline + 
-                 ": bad datanode " + nodes[errorIndex].getName());
-
         // remove bad datanode from list of datanodes.
-        //
-        DatanodeInfo[] newnodes =  new DatanodeInfo[nodes.length-1];
-        for (int i = 0; i < errorIndex; i++) {
-          newnodes[i] = nodes[i];
+        // If errorIndex was not set( i.e. appends) , then do not remove 
+        // any datanodes
+        // 
+        if (errorIndex < 0) {
+          newnodes = nodes;
+        } else {
+          String pipeline = pipelineMsg.toString();
+          if (nodes.length <= 1) {
+            lastException = new IOException("All datanodes " +
+                                            pipeline + " are bad. Aborting...");
+            closed = true;
+            if (streamer != null) streamer.close();
+            return false;
+          }
+          LOG.warn("Error Recovery for block " + block +
+                   " in pipeline " + pipeline + 
+                   ": bad datanode " + nodes[errorIndex].getName());
+          newnodes =  new DatanodeInfo[nodes.length-1];
+          for (int i = 0; i < errorIndex; i++) {
+            newnodes[i] = nodes[i];
+          }
+          for (int i = errorIndex; i < (nodes.length-1); i++) {
+            newnodes[i] = nodes[i+1];
+          }
         }
-        for (int i = errorIndex; i < (nodes.length-1); i++) {
-          newnodes[i] = nodes[i+1];
-        }
 
         // Tell the primary datanode to do error recovery 
         // by stamping appropriate generation stamps.
@@ -2194,7 +2217,7 @@
             LOG.warn(emsg);
             lastException = new IOException(emsg);
             closed = true;
-            streamer.close();
+            if (streamer != null) streamer.close();
             return false;       // abort with IOexception
           } 
           LOG.warn("Error Recovery for block " + block + " failed " +
@@ -2216,7 +2239,7 @@
 
         // setup new pipeline
         nodes = newnodes;
-        hasError = false;
+        this.hasError = false;
         errorIndex = 0;
         success = createBlockOutputStream(nodes, src, true);
       }
@@ -2253,11 +2276,12 @@
     }
 
     private Progressable progress;
+
     /**
      * Create a new output stream to the given DataNode.
      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
      */
-    public DFSOutputStream(String src, FsPermission masked,
+    DFSOutputStream(String src, FsPermission masked,
                            boolean overwrite,
                            short replication, long blockSize,
                            Progressable progress,
@@ -2294,12 +2318,118 @@
         throw re.unwrapRemoteException(AccessControlException.class,
                                        QuotaExceededException.class);
       }
-      streamer = new DataStreamer();
-      streamer.setDaemon(true);
+
       streamer.start();
     }
   
     /**
+     * Create a new output stream to the given DataNode.
+     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     */
+    DFSOutputStream(String src, int buffersize, Progressable progress
+        ) throws IOException {
+      super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
+      this.src = src;
+      this.progress = progress;
+      if (progress != null) {
+        LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
+      }
+      
+      LocatedBlock lastBlock = null;
+      DFSFileInfo stat = null;
+      try {
+        stat = getFileInfo(src);
+        lastBlock = namenode.append(src, clientName);
+        this.blockSize = stat.getBlockSize();
+      } catch(RemoteException re) {
+        throw re.unwrapRemoteException(AccessControlException.class,
+                                       QuotaExceededException.class);
+      }
+
+      int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); 
+      if (bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
+        throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
+                              ") and blockSize(" + blockSize + 
+                              ") do not match. " + "blockSize should be a " +
+                              "multiple of io.bytes.per.checksum");
+                              
+      }
+      checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+                                              bytesPerChecksum);
+
+      //
+      // The last partial block of the file has to be filled.
+      //
+      if (lastBlock != null) {
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock > blockSize) {
+          throw new IOException("The last block for file " + 
+                                src + " is full.");
+        }
+
+        // indicate that we are appending to an existing block
+        bytesCurBlock = lastBlock.getBlockSize();
+
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          LOG.warn("XXX partial checksum " + freeInCksum);
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          this.appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+                                 bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        this.appendBlock = true;
+        block = lastBlock.getBlock();
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        LOG.warn("XXX nodes.length = " + nodes.length);
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+                                " for last block " + lastBlock +
+                                "of file " + src);
+                        
+        }
+        processDatanodeError(true);
+        streamer.start();
+      }
+      else {
+        computePacketChunkSize(writePacketSize, bytesPerChecksum);
+        streamer.start();
+      }
+    }
+
+    private void computePacketChunkSize(int psize, int csize) {
+      int chunkSize = csize + checksum.getChecksumSize();
+      chunksPerPacket = Math.max((psize - DataNode.PKT_HEADER_LEN - 
+                                  SIZE_OF_INTEGER + chunkSize-1)/chunkSize, 1);
+      packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER + 
+                   chunkSize * chunksPerPacket; 
+      LOG.debug("computePacketChunkSize chunkSize " + chunkSize +
+               " chunksPerPacket " + chunksPerPacket +
+               " packetSize " + packetSize);
+    }
+
+    /**
      * Open a DataOutputStream to a DataNode so that it can be written to.
      * This happens when a file is created and each time a new block is allocated.
      * Must get block ID and the IDs of the destinations from the namenode.
@@ -2481,6 +2611,7 @@
                               this.checksum.getChecksumSize() + 
                               " but found to be " + checksum.length);
       }
+      LOG.info("XXX writeChunk offset:" + offset + " length:" + len);
 
       synchronized (dataQueue) {
   
@@ -2497,7 +2628,10 @@
           currentPacket = new Packet(packetSize, chunksPerPacket, 
                                      bytesCurBlock);
           LOG.debug("DFSClient writeChunk allocating new packet " + 
-                    currentPacket.seqno);
+                    currentPacket.seqno +
+                    " packetSize " + packetSize +
+                    " chunksPerPacket " + chunksPerPacket +
+                    " bytesCurBlock " + bytesCurBlock);
         }
 
         currentPacket.writeChecksum(checksum, 0, cklen);
@@ -2507,9 +2641,13 @@
 
         // If packet is full, enqueue it for transmission
         //
-        if (currentPacket.numChunks == chunksPerPacket ||
+        if (currentPacket.numChunks == currentPacket.maxChunks ||
             bytesCurBlock == blockSize) {
-          LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno);
+          LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno +
+                    " bytesCurBlock " + bytesCurBlock +
+                    " blockSize " + blockSize +
+                    " appendChunk " + appendChunk +
+                    " appendBlock " + appendBlock);
           //
           // if we allocated a new packet because we encountered a block
           // boundary, reset bytesCurBlock.
@@ -2522,6 +2660,17 @@
           dataQueue.addLast(currentPacket);
           dataQueue.notifyAll();
           currentPacket = null;
+ 
+          // If this was the first write after reopening a file, then the above
+          // write filled up any partial chunk. Tell the summer to generate full 
+          // crc chunks from now on.
+          if (appendChunk) {
+            appendChunk = false;
+            resetChecksumChunk(bytesPerChecksum);
+          }
+          computePacketChunkSize(Math.min((int)(blockSize - bytesCurBlock), 
+                                 writePacketSize), 
+                                 bytesPerChecksum);
         }
       }
       //LOG.debug("DFSClient writeChunk done length " + len +
Index: src/hdfs/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/NameNode.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/NameNode.java	(working copy)
@@ -275,8 +275,10 @@
                              long blockSize
                              ) throws IOException {
     String clientMachine = getClientMachine();
-    stateChangeLog.debug("*DIR* NameNode.create: file "
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.create: file "
                          +src+" for "+clientName+" at "+clientMachine);
+    }
     if (!checkPathLength(src)) {
       throw new IOException("create: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
@@ -288,17 +290,16 @@
     myMetrics.numFilesCreated.inc();
   }
 
-  /** Coming in a future release.... */
-  void append(String src, String clientName) throws IOException {
+  /** {@inheritDoc} */
+  public LocatedBlock append(String src, String clientName) throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.append: file "
           +src+" for "+clientName+" at "+clientMachine);
     }
-    //TODO: add namesystem.appendFile(...), which calls appendFileInternal(...)
-    namesystem.appendFileInternal(src, clientName, clientMachine);
-
-    //TODO: inc myMetrics;
+    LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
+    myMetrics.numFilesAppended.inc();
+    return info;
   }
 
   /** {@inheritDoc} */
Index: src/hdfs/org/apache/hadoop/dfs/FSDatasetInterface.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/FSDatasetInterface.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/FSDatasetInterface.java	(working copy)
@@ -114,6 +114,18 @@
   public InputStream getBlockInputStream(Block b, long seekOffset)
             throws IOException;
 
+  /**
+   * Returns an input stream at specified offset of the specified block
+   * The block is still in the tmp dirfectory and is not finalized
+   * @param b
+   * @param seekOffset
+   * @return an input stream to read the contents of the specified block,
+   *  starting at the offset
+   * @throws IOException
+   */
+  public BlockReadStreams getTmpInputStreams(Block b, long blkoff, long ckoff)
+            throws IOException;
+
      /**
       * 
       * This class contains the output streams for the data and checksum
@@ -129,6 +141,21 @@
       }
       
     }
+
+  /**
+   * 
+   * This class contains the input streams for the data and checksum
+   * of a block
+   *
+   */
+  static class BlockReadStreams {
+    InputStream dataIn;
+    InputStream checksumIn;
+      BlockReadStreams(InputStream dOut, InputStream cOut) {
+        dataIn = dOut;
+        checksumIn = cOut;
+    }
+  }
     
   /**
    * Creates the block and returns output streams to write data and CRC
Index: src/hdfs/org/apache/hadoop/dfs/NameNodeMetrics.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/NameNodeMetrics.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/NameNodeMetrics.java	(working copy)
@@ -46,6 +46,7 @@
     private NameNodeStatistics namenodeStats;
     
     public MetricsTimeVaryingInt numFilesCreated = new MetricsTimeVaryingInt("FilesCreated");
+    public MetricsTimeVaryingInt numFilesAppended = new MetricsTimeVaryingInt("FilesAppended");
     public MetricsTimeVaryingInt numGetBlockLocations = new MetricsTimeVaryingInt("GetBlockLocations");
     public MetricsTimeVaryingInt numFilesRenamed = new MetricsTimeVaryingInt("FilesRenamed");
     public MetricsTimeVaryingInt numFilesListed = new MetricsTimeVaryingInt("FilesListed");
@@ -92,6 +93,7 @@
     public void doUpdates(MetricsContext unused) {
       synchronized (this) {
         numFilesCreated.pushMetric(metricsRecord);
+        numFilesAppended.pushMetric(metricsRecord);
         numGetBlockLocations.pushMetric(metricsRecord);
         numFilesRenamed.pushMetric(metricsRecord);
         numFilesListed.pushMetric(metricsRecord);
Index: src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -43,6 +43,7 @@
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.DataOutputStream;
@@ -884,7 +885,8 @@
                  String holder, String clientMachine,
                  boolean overwrite, short replication, long blockSize
                 ) throws IOException {
-    startFileInternal(src, permissions, holder, clientMachine, overwrite,
+    boolean append = false;
+    startFileInternal(src, permissions, holder, clientMachine, overwrite, append,
                       replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
@@ -902,11 +904,14 @@
                                               String holder, 
                                               String clientMachine, 
                                               boolean overwrite,
+                                              boolean append,
                                               short replication,
                                               long blockSize
                                               ) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
-                                  +src+" for "+holder+" at "+clientMachine);
+                                  +src+" for "+holder+" at "+clientMachine
+                                  +" overwrite:"+overwrite
+                                  +" append:"+append);
     if (isInSafeMode())
       throw new SafeModeException("Cannot create file" + src, safeMode);
     if (!isValidName(src)) {
@@ -970,7 +975,15 @@
       } catch(IOException e) {
         throw new IOException("failed to create "+e.getMessage());
       }
-      if (!dir.isValidToCreate(src)) {
+      if (append) {
+        if (myFile == null) {
+          throw new IOException("failed to append to non-existent file " + src 
+                                +" on client " + clientMachine);
+        } else if (myFile.isDirectory()) {
+          throw new IOException("failed to append to directory " + src 
+                                +" on client " + clientMachine);
+        }
+      } else if (!dir.isValidToCreate(src)) {
         if (overwrite) {
           delete(src, true);
         } else {
@@ -983,71 +996,104 @@
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      //
-      // Now we can add the name to the filesystem. This file has no
-      // blocks associated with it.
-      //
-      checkFsObjectLimit();
+      if (append) {
+        //
+        // Replace current node with a INodeUnderConstruction.
+        // Recreate in-memory lease record.
+        //
+        INodeFile node = (INodeFile) myFile;
+        INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                        node.getLocalNameBytes(),
+                                        node.getReplication(),
+                                        node.getModificationTime(),
+                                        node.getPreferredBlockSize(),
+                                        node.getBlocks(),
+                                        node.getPermissionStatus(),
+                                        holder,
+                                        clientMachine,
+                                        clientNode);
+        dir.replaceNode(src, node, cons);
+        leaseManager.addLease(cons.clientName, src);
 
-      // increment global generation stamp
-      long genstamp = nextGenerationStamp();
-      INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
-          replication, blockSize, holder, clientMachine, clientNode, genstamp);
-      if (newNode == null) {
-        throw new IOException("DIR* NameSystem.startFile: " +
-                              "Unable to add file to namespace.");
+      } else {
+        //
+        // Now we can add the name to the filesystem. This file has no
+        // blocks associated with it.
+        //
+        checkFsObjectLimit();
+
+        // increment global generation stamp
+        long genstamp = nextGenerationStamp();
+        INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
+            replication, blockSize, holder, clientMachine, clientNode, genstamp);
+        if (newNode == null) {
+          throw new IOException("DIR* NameSystem.startFile: " +
+                                "Unable to add file to namespace.");
+        }
+        leaseManager.addLease(newNode.clientName, src);
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+                                      +"add "+src+" to namespace for "+holder);
+        }
       }
-      leaseManager.addLease(newNode.clientName, src);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
                                    +ie.getMessage());
       throw ie;
     }
-
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
-                                  +"add "+src+" to namespace for "+holder);
-    }
   }
 
-  /** append is not yet ready.  This method is for testing. */
-  void appendFileInternal(String src, String holder, String clientMachine
+  /**
+   * Append to an existing file in the namespace.
+   */
+  LocatedBlock appendFile(String src, String holder, String clientMachine
       ) throws IOException {
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
           +src+" for "+holder+" at "+clientMachine);
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot append file" + src, safeMode);
-    if (!isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
+    startFileInternal(src, null, holder, clientMachine, false, true, 
+                      (short)maxReplication, (long)0);
+    getEditLog().logSync();
 
-    try {
-      INodeFile f = dir.getFileINode(src);
-      //assume f != null && !f.isUnderConstruction() && lease does not exist
-      //TODO: remove the assumption 
+    //
+    // Create a LocatedBlock object for the last block of the file
+    // to be returned to the client. Return null if the file does not
+    // have a partial block at the end.
+    //
+    LocatedBlock lb = null;
+    synchronized (this) {
+      INodeFile file = (INodeFile) dir.getFileINode(src);
 
-      DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
-          clientMachine);
-      INodeFileUnderConstruction newnode = f.toINodeFileUnderConstruction(
-          holder, clientMachine, clientNode);
-
-      dir.replaceNode(src, f, newnode);
-      leaseManager.addLease(newnode.clientName, src);
-
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.appendFile: ", ie);
-      throw ie;
+      Block[] blocks = file.getBlocks();
+      if (blocks != null && blocks.length > 0) {
+        Block last = blocks[blocks.length-1];
+        BlockInfo storedBlock = blocksMap.getStoredBlock(last);
+        if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+          long fileLength = file.computeContentSummary().getLength();
+          int numNodes = blocksMap.numNodes(last);
+          DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
+          Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+          for (int i = 0; it != null && it.hasNext(); i++) {
+            targets[i] = it.next();
+          }
+          lb = new LocatedBlock(last, targets, 
+                                fileLength-storedBlock.getNumBytes());
+        }
+        // XXX remove from replication queue
+      }
     }
 
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: "
-          +"add "+src+" to namespace for "+holder);
+    if (auditLog.isInfoEnabled()) {
+      final FileStatus stat = dir.getFileInfo(src);
+      auditLog.info(String.format(AUDIT_FORMAT,
+                    UserGroupInformation.getCurrentUGI(),
+                    Server.getRemoteIp(),
+                    "append", src, stat.getOwner() + ':' +
+                    stat.getGroup() + ':' + stat.getPermission()));
     }
+    return lb;
   }
 
   /**
Index: src/hdfs/org/apache/hadoop/dfs/DistributedFileSystem.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/DistributedFileSystem.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/DistributedFileSystem.java	(working copy)
@@ -134,7 +134,9 @@
   /** This optional operation is not yet supported. */
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
-    throw new IOException("Not supported");
+
+    return new FSDataOutputStream(
+        dfs.append(getPathName(f), bufferSize, progress), statistics);
   }
 
   public FSDataOutputStream create(Path f, FsPermission permission,
Index: src/hdfs/org/apache/hadoop/dfs/ClientProtocol.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/ClientProtocol.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/ClientProtocol.java	(working copy)
@@ -37,10 +37,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 35 : Quota-related RPCs are introduced: getQuota, clearQuota;
-   * Besides, getContentSummary also returns the quota of the directory.
+   * 36 : Added append(...).
    */
-  public static final long versionID = 35L;
+  public static final long versionID = 36L;
   
   ///////////////////////////////////////
   // File contents
@@ -107,6 +106,18 @@
                              ) throws IOException;
 
   /**
+   * Append to the end of the file. 
+   * @param src path of the file being created.
+   * @param clientName name of the current client.
+   * @return information about the last partial block if any.
+   * @throws AccessControlException if permission to append file is 
+   * denied by the system. As usually on the client side the exception will 
+   * be wrapped into {@link org.apache.hadoop.ipc.RemoteException}.
+   * @throws IOException if other errors occur.
+   */
+  public LocatedBlock append(String src, String clientName) throws IOException;
+
+  /**
    * Set replication for an existing file.
    * <p>
    * The NameNode sets replication to the new value and returns.
Index: src/hdfs/org/apache/hadoop/dfs/FSDataset.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/FSDataset.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/FSDataset.java	(working copy)
@@ -94,12 +94,17 @@
       if (numBlocks < maxBlocksPerDir) {
         File dest = new File(dir, b.getBlockName());
         File metaData = getMetaFile( src, b );
-        if ( ! metaData.renameTo( getMetaFile(dest, b) ) ||
+        File newmeta = getMetaFile(dest, b);
+        if ( ! metaData.renameTo( newmeta ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
                                  " from tmp to " + 
                                  dest.getAbsolutePath() );
         }
+        DataNode.LOG.debug("XXX addBlock : Moved " + metaData +
+                           " to " + newmeta);
+        DataNode.LOG.debug("XXX addBlock : Moved " + src +
+                           " to " + dest);
 
         numBlocks += 1;
         return dest;
@@ -355,7 +360,8 @@
     }
     
     /**
-     * Temporary files. They get deleted when the datanode restarts
+     * Temporary files. They get moved to the real block directory either when
+     * the block is finalized or the datanode restarts.
      */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
@@ -650,7 +656,7 @@
     return new MetaDataInputStream(new FileInputStream(checksumFile),
                                                     checksumFile.length());
   }
-    
+
   FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
@@ -727,6 +733,31 @@
     }
     return new FileInputStream(blockInFile.getFD());
   }
+
+  /**
+   * Returns handles to the block file and its metadata file
+   */
+  public synchronized BlockReadStreams getTmpInputStreams(Block b, 
+                          long blkOffset, long ckoff) throws IOException {
+
+    DatanodeBlockInfo info = volumeMap.get(b);
+    if (info == null) {
+      throw new IOException("Block " + b + " does not exist in volumeMap.");
+    }
+    FSVolume v = info.getVolume();
+    File blockFile = v.getTmpFile(b);
+    RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
+    if (blkOffset > 0) {
+      blockInFile.seek(blkOffset);
+    }
+    File metaFile = getMetaFile(blockFile, b);
+    RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
+    if (ckoff > 0) {
+      metaInFile.seek(ckoff);
+    }
+    return new BlockReadStreams(new FileInputStream(blockInFile.getFD()),
+                                new FileInputStream(metaInFile.getFD()));
+  }
     
   private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
       return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
@@ -891,9 +922,8 @@
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client 
       // re-opens the connection and retries sending those packets.
-      // 
-      DataNode.LOG.info("Reopen Block " + b);
-      return null;
+      // The other reason is that an "append" is occuring to this block.
+      detachBlock(b, 1);
     }
     long blockSize = b.getNumBytes();
 
@@ -925,9 +955,33 @@
       if (!isRecovery) {
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
-        // Do not insert temporary file into volume map.
         f = createTmpFile(v, b);
         volumeMap.put(b, new DatanodeBlockInfo(v));
+      } else if (isValidBlock(b)) {
+        // reopening block for appending to it.
+        // XXX stop periodic block scanner.
+        DataNode.LOG.info("Reopen Block for append " + b);
+        v = volumeMap.get(b).getVolume();
+        f = createTmpFile(v, b);
+        File blkfile = getBlockFile(b);
+        File oldmeta = getMetaFile(b);
+        File newmeta = getMetaFile(f, b);
+
+        // rename meta file to tmp directory
+        DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+        if (!oldmeta.renameTo(newmeta)) {
+          throw new IOException("Block " + b + " reopen failed " +
+                                " Unable to move meta file  " + oldmeta +
+                                " to tmp dir " + newmeta);
+        }
+
+        // rename block file to tmp directory
+        DataNode.LOG.debug("Renamng " + blkfile + " to " + f);
+        if (!f.delete() || !blkfile.renameTo(f)) {
+          throw new IOException("Block " + b + " reopen failed " +
+                                " Unable to move block file to tmp dir.");
+        }
+        volumeMap.put(b, new DatanodeBlockInfo(v));
       }
       ongoingCreates.put(b, new ActiveFile(f, threads));
     }
@@ -948,6 +1002,7 @@
     // block size, so clients can't go crazy
     //
     File metafile = getMetaFile(f, b);
+    DataNode.LOG.debug("XXX writeTo metafile is " + metafile);
     return createBlockWriteStreams( f , metafile);
   }
 
Index: src/hdfs/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/DataNode.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -20,6 +20,8 @@
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.*;
@@ -45,6 +47,8 @@
 import java.nio.channels.SocketChannel;
 import java.util.*;
 import java.util.concurrent.Semaphore;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 
@@ -2270,6 +2274,7 @@
     private boolean isRecovery = false;
     private String clientName;
     DatanodeInfo srcDataNode = null;
+    Checksum partialCrc = null;
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
                   boolean isRecovery, String clientName, 
@@ -2290,6 +2295,7 @@
         //
         streams = data.writeToBlock(block, isRecovery);
         this.finalized = data.isValidBlock(block);
+        LOG.debug("XXX block finalized? " + this.finalized);
         if (streams != null) {
           this.out = streams.dataOut;
           this.checksumOut = new DataOutputStream(new BufferedOutputStream(
@@ -2447,7 +2453,7 @@
      * be more data from next packet in buf.<br><br>
      * 
      * It tries to read a full packet with single read call.
-     * Consecutinve packets are usually of the same length.
+     * Consecutive packets are usually of the same length.
      */
     private int readNextPacket() throws IOException {
       /* This dances around buf a little bit, mainly to read 
@@ -2605,11 +2611,31 @@
 
         verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
 
+
         try {
           if (!finalized) {
             //finally write to the disk :
             out.write(pktBuf, dataOff, len);
-            checksumOut.write(pktBuf, checksumOff, checksumLen);
+
+            // If this is a partial chunk, then verify that this is the only
+            // chunk in the packet. Calculate new crc for this chunk.
+            if (partialCrc != null) {
+              if (len > bytesPerChecksum) {
+                throw new IOException("Got wrong length during writeBlock(" + 
+                                      block + ") from " + inAddr + " " +
+                                      "A packet can have only one partial chunk."+
+                                      " len = " + len + 
+                                      " bytesPerChecksum " + bytesPerChecksum);
+              }
+              partialCrc.update(pktBuf, dataOff, len);
+              byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
+                                                              checksumSize);
+              checksumOut.write(buf);
+              LOG.debug("Writing out partial crc for data len " + len);
+              partialCrc = null;
+            } else {
+              checksumOut.write(pktBuf, checksumOff, checksumLen);
+            }
             myMetrics.bytesWritten.inc(len);
           }
         } catch (IOException iex) {
@@ -2739,12 +2765,6 @@
       if (data.getChannelPosition(block, streams) == offsetInBlock) {
         return;                   // nothing to do 
       }
-      if (offsetInBlock % bytesPerChecksum != 0) {
-        throw new IOException("setBlockPosition trying to set position to " +
-                              offsetInBlock +
-                              " which is not a multiple of bytesPerChecksum " +
-                               bytesPerChecksum);
-      }
       long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                               offsetInBlock / bytesPerChecksum * checksumSize;
       if (out != null) {
@@ -2753,6 +2773,16 @@
       if (checksumOut != null) {
         checksumOut.flush();
       }
+
+      // If this is a partial chunk, then read in pre-existing checksum
+      if (offsetInBlock % bytesPerChecksum != 0) {
+        LOG.info("setBlockPosition trying to set position to " +
+                 offsetInBlock +
+                 " which is not a multiple of bytesPerChecksum " +
+                 bytesPerChecksum);
+        computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
+      }
+
       LOG.info("Changing block file offset of block " + block + " from " + 
                data.getChannelPosition(block, streams) +
                " to " + offsetInBlock +
@@ -2761,6 +2791,54 @@
       // set the position of the block file
       data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
     }
+
+    /**
+     * reads in the partial crc chunk and computes checksum
+     * of pre-existing data in partial chunk.
+     */
+    private void computePartialChunkCrc(long blkoff, long ckoff, 
+                                        int bytesPerChecksum) throws IOException {
+
+      // find offset of the beginning of partial chunk.
+      //
+      int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
+      int checksumSize = checksum.getChecksumSize();
+      blkoff = blkoff - sizePartialChunk;
+      LOG.info("XXX computePartialChunkCrc sizePartialChunk " + 
+                sizePartialChunk +
+                " offset in block " + blkoff +
+                " offset in metafile " + ckoff);
+
+      // create an input stream from the block file
+      // and read in partial crc chunk into temporary buffer
+      //
+      FSDataset.BlockReadStreams instr = data.getTmpInputStreams(block, blkoff, ckoff);
+      byte[] buf = new byte[sizePartialChunk];
+      IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
+
+      // open meta file and read in crc value computer earlier
+      byte[] crcbuf = new byte[checksumSize];
+      IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
+      IOUtils.closeStream(instr.dataIn);
+      IOUtils.closeStream(instr.checksumIn);
+
+      // compute crc of partial chunk from data read in the block file.
+      partialCrc = new CRC32();
+      partialCrc.update(buf, 0, sizePartialChunk);
+      LOG.info("Read in partial CRC chunk from disk for block " + block);
+
+      // paranoia! verify that the pre-computed crc matches what we
+      // recalculated just now
+      if (partialCrc.getValue() != FSInputChecker.getChecksum(crcbuf)) {
+        LOG.warn("Partial CRC " + partialCrc.getValue() +
+                 " does not match value computed the " +
+                 " last time file was closed " +
+                 FSInputChecker.getChecksum(crcbuf));
+      } else {
+        LOG.debug("XXX Partial CRC matches 0x" + 
+                  Long.toHexString(partialCrc.getValue()));
+      }
+    }
   }
 
   /**
@@ -3100,7 +3178,15 @@
   /** {@inheritDoc} */
   public Block recoverBlock(Block block, DatanodeInfo[] targets
       ) throws IOException {
-    LOG.info("Client invoking recoverBlock for block " + block);
+    StringBuilder msg = new StringBuilder();
+    for (int i = 0; i < targets.length; i++) {
+      msg.append(targets[i].getName());
+      if (i < targets.length - 1) {
+        msg.append(",");
+      }
+    }
+    LOG.info("Client invoking recoverBlock for block " + block +
+             " on datanodes " + msg.toString());
     return LeaseManager.recoverBlock(block, targets, namenode, 
                                      getConf(), false);
   }
Index: src/hdfs/org/apache/hadoop/dfs/namenode/metrics/NameNodeStatistics.java
===================================================================
--- src/hdfs/org/apache/hadoop/dfs/namenode/metrics/NameNodeStatistics.java	(revision 672615)
+++ src/hdfs/org/apache/hadoop/dfs/namenode/metrics/NameNodeStatistics.java	(working copy)
@@ -166,6 +166,13 @@
   /**
    * @inheritDoc
    */
+  public int getNumFilesAppended() {
+    return myMetrics.numFilesAppended.getPreviousIntervalValue();
+  }
+
+  /**
+   * @inheritDoc
+   */
   public int getNumFilesListed() {
     return myMetrics.numFilesListed.getPreviousIntervalValue();
   }
Index: src/core/org/apache/hadoop/fs/FSInputChecker.java
===================================================================
--- src/core/org/apache/hadoop/fs/FSInputChecker.java	(revision 672615)
+++ src/core/org/apache/hadoop/fs/FSInputChecker.java	(working copy)
@@ -243,6 +243,9 @@
         } 
         retry = false;
       } catch (ChecksumException ce) {
+          for ( int i = off; i < off + read; i++) {
+            System.out.println("YYY " + b[i]);
+          }
           LOG.info("Found checksum error: "+StringUtils.stringifyException(ce));
           if (retriesLeft == 0) {
             throw ce;
@@ -288,6 +291,15 @@
     }
     return crc;
   }
+
+  /* calculate checksum value */
+  static public long getChecksum(byte[] checksum) {
+    long crc = 0L;
+    for(int i=0; i<checksum.length; i++) {
+      crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
+    }
+    return crc;
+  }
   
   @Override
   public synchronized long getPos() throws IOException {
Index: src/core/org/apache/hadoop/fs/FSOutputSummer.java
===================================================================
--- src/core/org/apache/hadoop/fs/FSOutputSummer.java	(revision 672615)
+++ src/core/org/apache/hadoop/fs/FSOutputSummer.java	(working copy)
@@ -79,6 +79,7 @@
    */
   public synchronized void write(byte b[], int off, int len)
   throws IOException {
+    System.out.println(" FSOutputsummer off " + off + " len " + len);
     if (off < 0 || len < 0 || off > b.length - len) {
       throw new ArrayIndexOutOfBoundsException();
     }
@@ -96,9 +97,13 @@
     if(count==0 && len>=buf.length) {
       // local buffer is empty and user data has one chunk
       // checksum and output data
-      sum.update(b, off, buf.length);
-      writeChecksumChunk(b, off, buf.length, false);
-      return buf.length;
+      int length = buf.length;
+      sum.update(b, off, length);
+      System.out.println("FSOutputSummer XXX1 write1 " + 
+                        " off " + off + " len " + length);
+      writeChecksumChunk(b, off, length, false);
+      System.out.println("FSOutputSummer XXX1 write1 returning " + length);
+      return length;
     }
     
     // copy user data to local buffer
@@ -109,6 +114,7 @@
     count += bytesToCopy;
     if (count == buf.length) {
       // local buffer is full
+      System.out.println("FSOutputSummer XXX1 flushBuffer " );
       flushBuffer();
     } 
     return bytesToCopy;
@@ -138,10 +144,12 @@
   
   /* Generate checksum for the data chunk and output data chunk & checksum
    * to the underlying output stream. If keep is true then keep the
-   * current ckecksum intact, do not reset it.
+   * current checksum intact, do not reset it.
    */
   private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
   throws IOException {
+    System.out.println("FSOutputSummer XXX1 writeChecksumChunk " + 
+                      " off " + off + " len " + len);
     int tempChecksum = (int)sum.getValue();
     if (!keep) {
       sum.reset();
@@ -154,4 +162,26 @@
 
     writeChunk(b, off, len, checksum);
   }
+
+  /**
+   * Converts a checksum integer value to a byte stream
+   */
+  static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
+    int tempChecksum = (int)sum.getValue();
+    byte[] checksum = new byte[checksumSize];
+    checksum[0] = (byte)((tempChecksum >>> 24) & 0xFF);
+    checksum[1] = (byte)((tempChecksum >>> 16) & 0xFF);
+    checksum[2] = (byte)((tempChecksum >>>  8) & 0xFF);
+    checksum[3] = (byte)((tempChecksum >>>  0) & 0xFF);
+    return checksum;
+  }
+
+  /**
+   * Resets existing buffer with a new one of the specified size.
+   */
+  public void resetChecksumChunk(int size) {
+    sum.reset();
+    this.buf = new byte[size];
+    this.count = 0;
+  }
 }
