Index: build.xml
===================================================================
--- build.xml	(revision 643793)
+++ build.xml	(working copy)
@@ -186,6 +186,7 @@
     <mkdir dir="${build.webapps}/job/WEB-INF"/>
     <mkdir dir="${build.webapps}/dfs/WEB-INF"/>
     <mkdir dir="${build.webapps}/datanode/WEB-INF"/>
+    <mkdir dir="${build.webapps}/secondary/WEB-INF"/>
     <mkdir dir="${build.examples}"/>
     <mkdir dir="${build.anttasks}"/>
     <mkdir dir="${build.dir}/c++"/>
Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml	(revision 643793)
+++ conf/hadoop-default.xml	(working copy)
@@ -213,7 +213,9 @@
   <name>fs.checkpoint.dir</name>
   <value>${hadoop.tmp.dir}/dfs/namesecondary</value>
   <description>Determines where on the local filesystem the DFS secondary
-      name node should store the temporary images and edits to merge.  
+      name node should store the temporary images and edits to merge.
+      If this is a comma-delimited list of directories then the image is
+      replicated in all of the directories for redundancy.
   </description>
 </property>
 
Index: src/java/org/apache/hadoop/conf/Configuration.java
===================================================================
--- src/java/org/apache/hadoop/conf/Configuration.java	(revision 643793)
+++ src/java/org/apache/hadoop/conf/Configuration.java	(working copy)
@@ -40,6 +40,7 @@
 import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.Collection;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -528,6 +529,21 @@
 
   /** 
    * Get the comma delimited values of the <code>name</code> property as 
+   * a collection of <code>String</code>s.  
+   * If no such property is specified then empty collection is returned.
+   * <p>
+   * This is an optimized version of {@link #getStrings(String)}
+   * 
+   * @param name property name.
+   * @return property value as a collection of <code>String</code>s. 
+   */
+  public Collection<String> getStringCollection(String name) {
+    String valueString = get(name);
+    return StringUtils.getStringCollection(valueString);
+  }
+
+  /** 
+   * Get the comma delimited values of the <code>name</code> property as 
    * an array of <code>String</code>s.  
    * If no such property is specified then <code>null</code> is returned.
    * 
Index: src/java/org/apache/hadoop/dfs/ClientProtocol.java
===================================================================
--- src/java/org/apache/hadoop/dfs/ClientProtocol.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/ClientProtocol.java	(working copy)
@@ -18,6 +18,8 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
 import org.apache.hadoop.fs.permission.*;
@@ -36,7 +38,7 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 27 : removed getContentLength(String), open(String, long, long) and isDir(String)
+   * 28 : rollEditLog() returns WritableComparable instead of long.
    */
   public static final long versionID = 27L;
   
@@ -375,10 +377,10 @@
   /**
    * Closes the current edit log and opens a new one. The 
    * call fails if the file system is in SafeMode.
-   * Returns a unique token to identify this transaction.
    * @throws IOException
+   * @return a unique token to identify this transaction.
    */
-  public long rollEditLog() throws IOException;
+  public WritableComparable rollEditLog() throws IOException;
 
   /**
    * Rolls the fsImage log. It removes the old fsImage, copies the
Index: src/java/org/apache/hadoop/dfs/FSConstants.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSConstants.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/FSConstants.java	(working copy)
@@ -146,7 +146,18 @@
   public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
 
   // Startup options
-  public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK, FINALIZE; }
+  public enum StartupOption{
+    FORMAT  ("-format"),
+    REGULAR ("-regular"),
+    UPGRADE ("-upgrade"),
+    ROLLBACK("-rollback"),
+    FINALIZE("-finalize"),
+    IMPORT  ("-importCheckpoint");
+    
+    private String name = null;
+    private StartupOption(String arg) {this.name = arg;}
+    String getName() {return name;}
+  }
 
   // type of the datanode report
   public static enum DatanodeReportType {ALL, LIVE, DEAD }
Index: src/java/org/apache/hadoop/dfs/FSDirectory.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSDirectory.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/FSDirectory.java	(working copy)
@@ -52,6 +52,7 @@
   /** Access an existing dfs name directory. */
   public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this(new FSImage(), ns, conf);
+    fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null));
   }
 
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
@@ -77,7 +78,14 @@
       startOpt = StartupOption.REGULAR;
     }
     try {
-      fsImage.recoverTransitionRead(dataDirs, startOpt);
+      if (fsImage.recoverTransitionRead(dataDirs, startOpt)) {
+        fsImage.saveFSImage();
+      }
+      FSEditLog editLog = fsImage.getEditLog();
+      assert editLog != null : "editLog must be initialized";
+      if (!editLog.isOpen())
+        editLog.open();
+      fsImage.setCheckpointDirectories(null);
     } catch(IOException e) {
       fsImage.close();
       throw e;
Index: src/java/org/apache/hadoop/dfs/FSEditLog.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSEditLog.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/FSEditLog.java	(working copy)
@@ -1054,7 +1054,7 @@
   /**
    * Returns the timestamp of the edit log
    */
-  synchronized long getFsEditTime() throws IOException {
+  synchronized long getFsEditTime() {
     return getEditFile(0).lastModified();
   }
 
Index: src/java/org/apache/hadoop/dfs/FSImage.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSImage.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/FSImage.java	(working copy)
@@ -28,9 +28,11 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.text.SimpleDateFormat;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Random;
@@ -37,6 +39,8 @@
 import java.lang.Math;
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.CheckpointStates;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.io.UTF8;
@@ -49,6 +53,9 @@
  */
 class FSImage extends Storage {
 
+  private static final SimpleDateFormat DATE_FORM =
+    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
   //
   // The filenames used for storing the images
   //
@@ -64,11 +71,20 @@
     String getName() {return fileName;}
   }
   
-  private long checkpointTime = -1L;
+  protected long checkpointTime = -1L;
   private FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
+  /**
+   * Directories for importing an image from a checkpoint.
+   */
+  private Collection<File> checkpointDirs;
 
   /**
+   * can fs-image be rolled?
+   */
+  volatile private CheckpointStates ckptState = CheckpointStates.START; 
+
+  /**
    */
   FSImage() {
     super(NodeType.NAME_NODE);
@@ -102,6 +118,10 @@
       this.addStorageDir(new StorageDirectory(it.next()));
   }
 
+  void setCheckpointDirectories(Collection<File> dirs) {
+    checkpointDirs = dirs;
+  }
+
   /**
    */
   File getImageFile(int imageDirIdx, NameNodeFile type) {
@@ -119,7 +139,28 @@
   File getEditNewFile(int idx) {
     return getImageFile(idx, NameNodeFile.EDITS_NEW);
   }
-  
+
+  File[] getFileNames(NameNodeFile type) {
+    File[] list = new File[getNumStorageDirs()];
+    int i=0;
+    for(StorageDirectory sd : storageDirs) {
+      list[i++] = getImageFile(sd, type);
+    }
+    return list;
+  }
+
+  File[] getImageFiles() {
+    return getFileNames(NameNodeFile.IMAGE);
+  }
+
+  File[] getEditsFiles() {
+    return getFileNames(NameNodeFile.EDITS);
+  }
+
+  File[] getTimeFiles() {
+    return getFileNames(NameNodeFile.TIME);
+  }
+
   /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
@@ -129,8 +170,9 @@
    * @param dataDirs
    * @param startOpt startup option
    * @throws IOException
+   * @return true if the image needs to be saved or false otherwise
    */
-  void recoverTransitionRead(Collection<File> dataDirs,
+  boolean recoverTransitionRead(Collection<File> dataDirs,
                              StartupOption startOpt
                              ) throws IOException {
     assert startOpt != StartupOption.FORMAT : 
@@ -135,6 +177,12 @@
                              ) throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
+
+    if(startOpt == StartupOption.IMPORT 
+        && (checkpointDirs == null || checkpointDirs.isEmpty()))
+      throw new IOException("Cannot import image from a checkpoint. "
+                          + "\"fs.checkpoint.dir\" is not set." );
+
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
     this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
@@ -169,6 +217,10 @@
           sd.read(); // read and verify consistency with other directories
           isFormatted = true;
         }
+        if (startOpt == StartupOption.IMPORT && isFormatted)
+          // import of a checkpoint is allowed only into empty image directories
+          throw new IOException("Cannot import image from a checkpoint. " 
+              + " NameNode already contains an image in " + sd.root);
       } catch (IOException ioe) {
         sd.unlock();
         throw ioe;
@@ -180,8 +232,9 @@
 
     if (dataDirs.size() == 0)  // none of the data dirs exist
       throw new IOException(
-                            "All specified directories are not accessible or do not exist.");
-    if (!isFormatted && startOpt != StartupOption.ROLLBACK)
+        "All specified directories are not accessible or do not exist.");
+    if (!isFormatted && startOpt != StartupOption.ROLLBACK 
+                     && startOpt != StartupOption.IMPORT)
       throw new IOException("NameNode is not formatted.");
     if (startOpt != StartupOption.UPGRADE
           && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
@@ -215,17 +268,17 @@
     switch(startOpt) {
     case UPGRADE:
       doUpgrade();
-      break;
+      return false; // upgrade saved image already
+    case IMPORT:
+      doImportCheckpoint();
+      return true;
     case ROLLBACK:
       doRollback();
-      // and now load that image
+      break;
     case REGULAR:
-      if (loadFSImage())
-        saveFSImage();
+      // just load the image
     }
-    assert editLog != null : "editLog must be initialized";
-    if(!editLog.isOpen())
-      editLog.open();
+    return loadFSImage();
   }
 
   private void doUpgrade() throws IOException {
@@ -360,6 +413,30 @@
     LOG.info("Finalize upgrade for " + sd.root + " is complete.");
   }
 
+  /**
+   * Load image from a checkpoint directory and save it into the current one.
+   * @throws IOException
+   */
+  void doImportCheckpoint() throws IOException {
+    FSImage ckptImage = new FSImage();
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    // replace real image with the checkpoint image
+    FSImage realImage = fsNamesys.getFSImage();
+    assert realImage == this;
+    fsNamesys.dir.fsImage = ckptImage;
+    // load from the checkpoint dirs
+    try {
+      ckptImage.recoverTransitionRead(checkpointDirs, StartupOption.REGULAR);
+    } finally {
+      ckptImage.close();
+    }
+    // return back the real image
+    realImage.setStorageInfo(ckptImage);
+    fsNamesys.dir.fsImage = realImage;
+    // and save it
+    saveFSImage();
+  }
+
   void finalizeUpgrade() throws IOException {
     for(int idx = 0; idx < getNumStorageDirs(); idx++)
       doFinalize(getStorageDir(idx));
@@ -808,6 +885,7 @@
       if (editsNew.exists()) 
         editLog.createEditLogFile(editsNew);
     }
+    ckptState = CheckpointStates.UPLOAD_DONE;
     rollFSImage();
   }
 
@@ -1009,6 +1087,9 @@
    * Reopens the new edits file.
    */
   void rollFSImage() throws IOException {
+    if (ckptState != CheckpointStates.UPLOAD_DONE) {
+      throw new IOException("Cannot roll fsImage before rolling edits log.");
+    }
     //
     // First, verify that edits.new and fsimage.ckpt exists in all
     // checkpoint directories.
@@ -1059,6 +1140,130 @@
         idx--;
       }
     }
+    ckptState = CheckpointStates.START;
+  }
+
+  /**
+   * A unique signature intended to identify checkpoint transactions.
+   */
+  static class CheckpointSignature extends StorageInfo 
+                        implements WritableComparable<CheckpointSignature> {
+    private static final String FIELD_SEPARATOR = ":";
+    long editsTime = -1L;
+    long checkpointTime = -1L;
+
+    CheckpointSignature() {}
+
+    CheckpointSignature(FSImage fsImage) {
+      super(fsImage);
+      editsTime = fsImage.getEditLog().getFsEditTime();
+      checkpointTime = fsImage.checkpointTime;
+    }
+
+    CheckpointSignature(String str) {
+      String[] fields = str.split(FIELD_SEPARATOR);
+      assert fields.length == 5 : "Must be 5 fields in CheckpointSignature";
+      layoutVersion = Integer.valueOf(fields[0]);
+      namespaceID = Integer.valueOf(fields[1]);
+      cTime = Long.valueOf(fields[2]);
+      editsTime = Long.valueOf(fields[3]);
+      checkpointTime = Long.valueOf(fields[4]);
+    }
+
+    public String toString() {
+      return String.valueOf(layoutVersion) + FIELD_SEPARATOR
+           + String.valueOf(namespaceID) + FIELD_SEPARATOR
+           + String.valueOf(cTime) + FIELD_SEPARATOR
+           + String.valueOf(editsTime) + FIELD_SEPARATOR
+           + String.valueOf(checkpointTime);
+    }
+
+    void validateStorageInfo(StorageInfo si) throws IOException {
+      if(layoutVersion != si.layoutVersion
+          || namespaceID != si.namespaceID || cTime != si.cTime) {
+        // checkpointTime can change when the image is saved - do not compare
+        throw new IOException("Inconsistent checkpoint fileds. "
+            + "LV = " + layoutVersion + " namespaceID = " + namespaceID
+            + " cTime = " + cTime + ". Expecting respectively: "
+            + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime);
+      }
+    }
+
+    //
+    // Comparable interface
+    //
+    public int compareTo(CheckpointSignature o) {
+      return 
+        (layoutVersion < o.layoutVersion) ? -1 : 
+                    (layoutVersion > o.layoutVersion) ? 1 :
+        (namespaceID < o.namespaceID) ? -1 : (namespaceID > o.namespaceID) ? 1 :
+        (cTime < o.cTime) ? -1 : (cTime > o.cTime) ? 1 :
+        (editsTime < o.editsTime) ? -1 : (editsTime > o.editsTime) ? 1 :
+        (checkpointTime < o.checkpointTime) ? -1 : 
+                    (checkpointTime > o.checkpointTime) ? 1 : 0;
+    }
+
+    public boolean equals(Object o) {
+      if (!(o instanceof CheckpointSignature)) {
+        return false;
+      }
+      return compareTo((CheckpointSignature)o) == 0;
+    }
+
+    /////////////////////////////////////////////////
+    // Writable
+    /////////////////////////////////////////////////
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(getLayoutVersion());
+      out.writeInt(getNamespaceID());
+      out.writeLong(getCTime());
+      out.writeLong(editsTime);
+      out.writeLong(checkpointTime);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      layoutVersion = in.readInt();
+      namespaceID = in.readInt();
+      cTime = in.readLong();
+      editsTime = in.readLong();
+      checkpointTime = in.readLong();
+    }
+  }
+
+  WritableComparable rollEditLog() throws IOException {
+    getEditLog().rollEditLog();
+    ckptState = CheckpointStates.ROLLED_EDITS;
+    return new CheckpointSignature(this);
+  }
+
+  /**
+   * This is called just before a new checkpoint is uploaded to the
+   * namenode.
+   */
+  void validateCheckpointUpload(CheckpointSignature sig) throws IOException {
+    if (ckptState != CheckpointStates.ROLLED_EDITS) {
+      throw new IOException("Namenode is not expecting an new image " +
+                             ckptState);
+    } 
+    // verify token
+    long modtime = getEditLog().getFsEditTime();
+    if (sig.editsTime != modtime) {
+      throw new IOException("Namenode has an edit log with timestamp of " +
+                            DATE_FORM.format(new Date(modtime)) +
+                            " but new checkpoint was created using editlog " +
+                            " with timestamp " + 
+                            DATE_FORM.format(new Date(sig.editsTime)) + 
+                            ". Checkpoint Aborted.");
+    }
+    sig.validateStorageInfo(this);
+    ckptState = CheckpointStates.UPLOAD_START;
+  }
+
+  /**
+   * This is called when a checkpoint upload finishes successfully.
+   */
+  synchronized void checkpointUploadDone() {
+    ckptState = CheckpointStates.UPLOAD_DONE;
   }
 
   void close() throws IOException {
@@ -1073,6 +1278,14 @@
     return getImageFile(0, NameNodeFile.IMAGE);
   }
 
+  File getFsEditName() throws IOException {
+    return getEditLog().getFsEditName();
+  }
+
+  File getFsTimeName() {
+    return getImageFile(0, NameNodeFile.TIME);
+  }
+
   /**
    * Return the name of the image file that is uploaded by periodic
    * checkpointing.
@@ -1191,7 +1404,7 @@
 
   private void verifyDistributedUpgradeProgress(StartupOption startOpt
                                                 ) throws IOException {
-    if(startOpt == StartupOption.ROLLBACK)
+    if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
       return;
     UpgradeManager um = FSNamesystem.getFSNamesystem().upgradeManager;
     assert um != null : "FSNameSystem.upgradeManager is null.";
@@ -1218,4 +1431,16 @@
         + FSConstants.LAYOUT_VERSION + " is initialized.");
   }
 
+  static Collection<File> getCheckpointDirs(Configuration conf,
+                                            String defaultName) {
+    Collection<String> dirNames = conf.getStringCollection("fs.checkpoint.dir");
+    if (dirNames.size() == 0 && defaultName != null) {
+      dirNames.add(defaultName);
+    }
+    Collection<File> dirs = new ArrayList<File>(dirNames.size());
+    for(String name : dirNames) {
+      dirs.add(new File(name));
+    }
+    return dirs;
+  }
 }
Index: src/java/org/apache/hadoop/dfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/FSNamesystem.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/FSNamesystem.java	(working copy)
@@ -34,6 +34,7 @@
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.Server;
 
 import java.io.BufferedWriter;
@@ -46,7 +47,6 @@
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.text.SimpleDateFormat;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -227,12 +227,6 @@
   private HostsFileReader hostsReader; 
   private Daemon dnthread = null;
 
-  // can fs-image be rolled?
-  volatile private CheckpointStates ckptState = CheckpointStates.START; 
-
-  private static final SimpleDateFormat DATE_FORM =
-    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
   private long maxFsObjects = 0;          // maximum number of fs objects
 
   /**
@@ -325,8 +319,8 @@
         conf.get("dfs.datanode.https.address", infoHost + ":" + 50475));
     this.infoServer.setAttribute("datanode.https.port",
         datanodeSslPort.getPort());
-    this.infoServer.setAttribute("name.system", this);
     this.infoServer.setAttribute("name.node", nn);
+    this.infoServer.setAttribute("name.system.image", getFSImage());
     this.infoServer.setAttribute("name.conf", conf);
     this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
     this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
@@ -341,12 +335,12 @@
   }
 
   static Collection<File> getNamespaceDirs(Configuration conf) {
-    String[] dirNames = conf.getStrings("dfs.name.dir");
-    if (dirNames == null)
-      dirNames = new String[] {"/tmp/hadoop/dfs/name"};
-    Collection<File> dirs = new ArrayList<File>(dirNames.length);
-    for(int idx = 0; idx < dirNames.length; idx++) {
-      dirs.add(new File(dirNames[idx]));
+    Collection<String> dirNames = conf.getStringCollection("dfs.name.dir");
+    if (dirNames.isEmpty())
+      dirNames.add("/tmp/hadoop/dfs/name");
+    Collection<File> dirs = new ArrayList<File>(dirNames.size());
+    for(String name : dirNames) {
+      dirs.add(new File(name));
     }
     return dirs;
   }
@@ -3862,7 +3856,8 @@
       }
       String safeBlockRatioMsg = 
         String.format(
-          "The ratio of reported blocks %.4f has not reached the threshold %.4f. ",
+          "The ratio of reported blocks %.4f has " +
+          (reached == 0 ? "not " : "") + "reached the threshold %.4f. ",
           getSafeBlockRatio(), threshold) + autoOffMsg;
       if(reached == 0)  // threshold is not reached 
         return safeBlockRatioMsg + ".";
@@ -4042,7 +4037,7 @@
     return getEditLog().getEditLogSize();
   }
 
-  synchronized long rollEditLog() throws IOException {
+  synchronized WritableComparable rollEditLog() throws IOException {
     if (isInSafeMode()) {
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
@@ -4048,13 +4043,10 @@
                                   safeMode);
     }
     LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
-    getEditLog().rollEditLog();
-    ckptState = CheckpointStates.ROLLED_EDITS;
-    return getEditLog().getFsEditTime();
+    return getFSImage().rollEditLog();
   }
 
   synchronized void rollFSImage() throws IOException {
-    LOG.info("Roll FSImage from " + Server.getRemoteAddress());
     if (isInSafeMode()) {
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
@@ -4059,44 +4051,8 @@
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
     }
-    if (ckptState != CheckpointStates.UPLOAD_DONE) {
-      throw new IOException("Cannot roll fsImage before rolling edits log.");
-    }
-    dir.fsImage.rollFSImage();
-    ckptState = CheckpointStates.START;
-  }
-
-  File getFsEditName() throws IOException {
-    return getEditLog().getFsEditName();
-  }
-
-  /**
-   * This is called just before a new checkpoint is uploaded to the
-   * namenode.
-   */
-  synchronized void validateCheckpointUpload(long token) throws IOException {
-    if (ckptState != CheckpointStates.ROLLED_EDITS) {
-      throw new IOException("Namenode is not expecting an new image " +
-                             ckptState);
-    } 
-    // verify token
-    long modtime = getEditLog().getFsEditTime();
-    if (token != modtime) {
-      throw new IOException("Namenode has an edit log with timestamp of " +
-                            DATE_FORM.format(new Date(modtime)) +
-                            " but new checkpoint was created using editlog " +
-                            " with timestamp " + 
-                            DATE_FORM.format(new Date(token)) + 
-                            ". Checkpoint Aborted.");
-    }
-    ckptState = CheckpointStates.UPLOAD_START;
-  }
-
-  /**
-   * This is called when a checkpoint upload finishes successfully.
-   */
-  synchronized void checkpointUploadDone() {
-    ckptState = CheckpointStates.UPLOAD_DONE;
+    LOG.info("Roll FSImage from " + Server.getRemoteAddress());
+    getFSImage().rollFSImage();
   }
 
   /**
Index: src/java/org/apache/hadoop/dfs/GetImageServlet.java
===================================================================
--- src/java/org/apache/hadoop/dfs/GetImageServlet.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/GetImageServlet.java	(working copy)
@@ -19,8 +19,6 @@
 
 import java.util.*;
 import java.io.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.commons.logging.*;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -27,6 +25,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * This class is used in Namesystem's jetty to retrieve a file.
@@ -34,8 +33,7 @@
  * edit file for periodic checkpointing.
  */
 public class GetImageServlet extends HttpServlet {
-
-  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.FSNamesystem");
+  private static final long serialVersionUID = -7669068179452648952L;
 
   @SuppressWarnings("unchecked")
   public void doGet(HttpServletRequest request,
@@ -44,29 +42,29 @@
     Map<String,String[]> pmap = request.getParameterMap();
     try {
       ServletContext context = getServletContext();
-      NameNode nn = (NameNode) context.getAttribute("name.node");
+      FSImage nnImage = (FSImage)context.getAttribute("name.system.image");
       TransferFsImage ff = new TransferFsImage(pmap, request, response);
       if (ff.getImage()) {
-        // send fsImage to Secondary
+        // send fsImage
         TransferFsImage.getFileServer(response.getOutputStream(),
-                                      nn.getFsImageName()); 
+                                      nnImage.getFsImageName()); 
       } else if (ff.getEdit()) {
-        // send old edits to Secondary
+        // send edits
         TransferFsImage.getFileServer(response.getOutputStream(),
-                                      nn.getFsEditName());
+                                      nnImage.getFsEditName());
       } else if (ff.putImage()) {
         // issue a HTTP get request to download the new fsimage 
-        nn.validateCheckpointUpload(ff.getToken());
+        nnImage.validateCheckpointUpload(ff.getToken());
         TransferFsImage.getFileClient(ff.getInfoServer(), "getimage=1", 
-                                      nn.getFsImageNameCheckpoint());
-        nn.checkpointUploadDone();
+                                      nnImage.getFsImageNameCheckpoint());
+        nnImage.checkpointUploadDone();
       }
-    } catch (IOException ie) {
-      StringUtils.stringifyException(ie);
-      LOG.warn(ie);
-      String errMsg = "GetImage failed.";
+    } catch (Exception ie) {
+      String errMsg = "GetImage failed. " + StringUtils.stringifyException(ie);
       response.sendError(HttpServletResponse.SC_GONE, errMsg);
-      throw ie;
+      throw new IOException(errMsg);
+    } finally {
+      response.getOutputStream().close();
     }
   }
 }
Index: src/java/org/apache/hadoop/dfs/NameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/NameNode.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/NameNode.java	(working copy)
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.StringUtils;
@@ -486,7 +487,7 @@
   /**
    * Roll the edit log.
    */
-  public long rollEditLog() throws IOException {
+  public WritableComparable rollEditLog() throws IOException {
     return namesystem.rollEditLog();
   }
 
@@ -700,27 +701,6 @@
   }
 
   /**
-   * Validates that this is a valid checkpoint upload request
-   */
-  public void validateCheckpointUpload(long token) throws IOException {
-    namesystem.validateCheckpointUpload(token);
-  }
-
-  /**
-   * Indicates that a new checkpoint has been successfully uploaded.
-   */
-  public void checkpointUploadDone() {
-    namesystem.checkpointUploadDone();
-  }
-
-  /**
-   * Returns the name of the edits file
-   */
-  public File getFsEditName() throws IOException {
-    return namesystem.getFsEditName();
-  }
-
-  /**
    * Returns the address on which the NameNodes is listening to.
    * @return the address on which the NameNodes is listening to.
    */
@@ -788,7 +768,12 @@
 
   private static void printUsage() {
     System.err.println(
-      "Usage: java NameNode [-format] | [-upgrade] | [-rollback] | [-finalize]");
+      "Usage: java NameNode [" +
+      StartupOption.FORMAT.getName() + "] | [" +
+      StartupOption.UPGRADE.getName() + "] | [" +
+      StartupOption.ROLLBACK.getName() + "] | [" +
+      StartupOption.FINALIZE.getName() + "] | [" +
+      StartupOption.IMPORT.getName() + "]");
   }
 
   private static StartupOption parseArguments(String args[], 
@@ -797,16 +782,18 @@
     StartupOption startOpt = StartupOption.REGULAR;
     for(int i=0; i < argsLen; i++) {
       String cmd = args[i];
-      if ("-format".equalsIgnoreCase(cmd)) {
+      if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.FORMAT;
-      } else if ("-regular".equalsIgnoreCase(cmd)) {
+      } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.REGULAR;
-      } else if ("-upgrade".equalsIgnoreCase(cmd)) {
+      } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.UPGRADE;
-      } else if ("-rollback".equalsIgnoreCase(cmd)) {
+      } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.ROLLBACK;
-      } else if ("-finalize".equalsIgnoreCase(cmd)) {
+      } else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.FINALIZE;
+      } else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.IMPORT;
       } else
         return null;
     }
Index: src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/SecondaryNameNode.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/SecondaryNameNode.java	(working copy)
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.dfs.FSImage.CheckpointSignature;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.mapred.StatusHttpServer;
@@ -29,12 +30,8 @@
 
 import java.io.*;
 import java.net.*;
-import java.util.Map;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import java.util.ArrayList;
+import java.util.Collection;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 
 /**********************************************************
@@ -52,11 +49,11 @@
  **********************************************************/
 public class SecondaryNameNode implements FSConstants, Runnable {
     
-  public static final Log LOG = LogFactory.getLog(
-                                                  "org.apache.hadoop.dfs.NameNode.Secondary");
-  private static final String SRC_FS_IMAGE = "srcimage.tmp";
-  private static final String FS_EDITS = "edits.tmp";
-  private static final String DEST_FS_IMAGE = "destimage.tmp";
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.NameNode.Secondary");
+
+  private String fsName;
+  private CheckpointStorage checkpointImage;
 
   private ClientProtocol namenode;
   private Configuration conf;
@@ -66,14 +63,11 @@
   private int infoPort;
   private String infoBindAddress;
 
-  private File checkpointDir;
+  private Collection<File> checkpointDirs;
   private long checkpointPeriod;	// in seconds
   private long checkpointSize;    // size (in MB) of current Edit Log
-  private File srcImage;
-  private File destImage;
-  private File editFile;
 
-  private boolean[] simulation = null; // error simulation events
+  private static boolean[] simulation = null; // error simulation events
 
   /**
    * Create a connection to the primary namenode.
@@ -78,19 +72,23 @@
   /**
    * Create a connection to the primary namenode.
    */
-  public SecondaryNameNode(Configuration conf)  throws IOException {
+  SecondaryNameNode(Configuration conf)  throws IOException {
+    try {
+      initialize(conf);
+    } catch(IOException e) {
+      shutdown();
+      throw e;
+    }
+  }
 
+  /**
+   * Initialize SecondaryNameNode.
+   */
+  private void initialize(Configuration conf) throws IOException {
     // initiate Java VM metrics
     JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
     
-    //
-    // initialize error simulation code for junit test
-    //
-    initializeErrorSimulationEvent(2);
-
-    //
     // Create connection to the namenode.
-    //
     shouldRun = true;
     nameNodeAddr =
       NetUtils.createSocketAddr(FileSystem.getDefaultUri(conf).getAuthority());
@@ -99,9 +97,18 @@
         (ClientProtocol) RPC.waitForProxy(ClientProtocol.class,
             ClientProtocol.versionID, nameNodeAddr, conf);
 
-    //
+    // initialize checkpoint directories
+    fsName = getInfoServer();
+    checkpointDirs = FSImage.getCheckpointDirs(conf,
+                                  "/tmp/hadoop/dfs/namesecondary");
+    checkpointImage = new CheckpointStorage();
+    checkpointImage.recoverCreate(checkpointDirs);
+
+    // Initialize other scheduling parameters from the configuration
+    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
+    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
+
     // initialize the webserver for uploading files.
-    //
     String infoAddr = 
       NetUtils.getServerAddress(conf, 
                                 "dfs.secondary.info.bindAddress",
@@ -110,11 +117,11 @@
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
     infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
-    infoServer = new StatusHttpServer("dfs", infoBindAddress, tmpInfoPort, 
+    infoServer = new StatusHttpServer("secondary", infoBindAddress, tmpInfoPort, 
                                       tmpInfoPort == 0);
-    infoServer.setAttribute("name.secondary", this);
-    this.infoServer.setAttribute("name.conf", conf);
     infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+    infoServer.setAttribute("name.system.image", checkpointImage);
+    this.infoServer.setAttribute("name.conf", conf);
     infoServer.start();
 
     // The web-server port can be ephemeral... ensure we have the correct info
@@ -121,17 +128,6 @@
     infoPort = infoServer.getPort();
     conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort); 
     LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
-
-    //
-    // Initialize other scheduling parameters from the configuration
-    //
-    String[] dirName = conf.getStrings("fs.checkpoint.dir");
-    checkpointDir = new File(dirName[0]);
-    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
-    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
-    doSetup();
-
-    LOG.warn("Checkpoint Directory:" + checkpointDir);
     LOG.warn("Checkpoint Period   :" + checkpointPeriod + " secs " +
              "(" + checkpointPeriod/60 + " min)");
     LOG.warn("Log Size Trigger    :" + checkpointSize + " bytes " +
@@ -145,26 +141,15 @@
   public void shutdown() {
     shouldRun = false;
     try {
-      infoServer.stop();
-    } catch (Exception e) {
+      if (infoServer != null) infoServer.stop();
+    } catch(InterruptedException ie) {
+      LOG.warn(StringUtils.stringifyException(ie));
     }
-  }
-
-  private void doSetup() throws IOException {
-    //
-    // Create the checkpoint directory if needed. 
-    //
-    checkpointDir.mkdirs();
-    srcImage = new File(checkpointDir, SRC_FS_IMAGE);
-    destImage = new File(checkpointDir, DEST_FS_IMAGE);
-    editFile = new File(checkpointDir, FS_EDITS);
-    srcImage.delete();
-    destImage.delete();
-    editFile.delete();
-  }
-
-  File getNewImage() {
-    return destImage;
+    try {
+      if (checkpointImage != null) checkpointImage.close();
+    } catch(IOException e) {
+      LOG.info(StringUtils.stringifyException(e));
+    }
   }
 
   //
@@ -201,11 +186,11 @@
           lastCheckpointTime = now;
         }
       } catch (IOException e) {
-        LOG.error("Exception in doCheckpoint:");
+        LOG.error("Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
         e.printStackTrace();
       } catch (Throwable e) {
-        LOG.error("Throwable Exception in doCheckpoint:");
+        LOG.error("Throwable Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
         e.printStackTrace();
         Runtime.getRuntime().exit(-1);
@@ -214,25 +199,33 @@
   }
 
   /**
-   * get the current fsimage from Namenode.
+   * Download <code>fsimage</code> and <code>edits</code>
+   * files from the name-node.
+   * @throws IOException
    */
-  private void getFSImage() throws IOException {
-    String fsName = getInfoServer();
+  private void downloadCheckpointFiles(CheckpointSignature sig
+                                      ) throws IOException {
+    
+    checkpointImage.cTime = sig.cTime;
+    checkpointImage.checkpointTime = sig.checkpointTime;
+
+    // get fsimage
     String fileid = "getimage=1";
-    TransferFsImage.getFileClient(fsName, fileid, srcImage);
-    LOG.info("Downloaded file " + srcImage + " size " +
-             srcImage.length() + " bytes.");
-  }
+    File[] srcNames = checkpointImage.getImageFiles();
+    assert srcNames.length > 0 : "No checkpoint targets.";
+    TransferFsImage.getFileClient(fsName, fileid, srcNames);
+    LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
+             srcNames[0].length() + " bytes.");
 
-  /**
-   * get the old edits file from the NameNode
-   */
-  private void getFSEdits() throws IOException {
-    String fsName = getInfoServer();
-    String fileid = "getedit=1";
-    TransferFsImage.getFileClient(fsName, fileid, editFile);
-    LOG.info("Downloaded file " + editFile + " size " +
-             editFile.length() + " bytes.");
+    // get edits file
+    fileid = "getedit=1";
+    srcNames = checkpointImage.getEditsFiles();
+    assert srcNames.length > 0 : "No checkpoint targets.";
+    TransferFsImage.getFileClient(fsName, fileid, srcNames);
+    LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
+        srcNames[0].length() + " bytes.");
+
+    checkpointImage.checkpointUploadDone();
   }
 
   /**
@@ -238,12 +231,11 @@
   /**
    * Copy the new fsimage into the NameNode
    */
-  private void putFSImage(long token) throws IOException {
-    String fsName = getInfoServer();
+  private void putFSImage(CheckpointSignature sig) throws IOException {
     String fileid = "putimage=1&port=" + infoPort +
       "&machine=" +
       InetAddress.getLocalHost().getHostAddress() +
-      "&token=" + token;
+      "&token=" + sig.toString();
     LOG.info("Posted URL " + fsName + fileid);
     TransferFsImage.getFileClient(fsName, fileid, (File[])null);
   }
@@ -248,7 +240,7 @@
     TransferFsImage.getFileClient(fsName, fileid, (File[])null);
   }
 
-  /*
+  /**
    * Returns the Jetty server that the Namenode is listening on.
    */
   private String getInfoServer() throws IOException {
@@ -260,7 +252,7 @@
                                      "dfs.info.port", "dfs.http.address");
   }
 
-  /*
+  /**
    * Create a new checkpoint
    */
   void doCheckpoint() throws IOException {
@@ -265,20 +257,14 @@
    */
   void doCheckpoint() throws IOException {
 
-    //
     // Do the required initialization of the merge work area.
-    //
-    doSetup();
+    startCheckpoint();
 
-    //
     // Tell the namenode to start logging transactions in a new edit file
     // Retuns a token that would be used to upload the merged image.
-    //
-    long token = namenode.rollEditLog();
+    CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog();
 
-    //
     // error simulation code for junit test
-    //
     if (simulation != null && simulation[0]) {
       throw new IOException("Simulating error0 " +
                             "after creating edits.new");
@@ -284,9 +270,8 @@
                             "after creating edits.new");
     }
 
-    getFSImage();                // Fetch fsimage
-    getFSEdits();                // Fetch edist
-    doMerge();                   // Do the merge
+    downloadCheckpointFiles(sig);   // Fetch fsimage, edits, time
+    doMerge(sig);                   // Do the merge
   
     //
     // Upload the new image into the NameNode. Then tell the Namenode
@@ -292,7 +277,7 @@
     // Upload the new image into the NameNode. Then tell the Namenode
     // to make this new uploaded image as the most current image.
     //
-    putFSImage(token);
+    putFSImage(sig);
 
     //
     // error simulation code for junit test
@@ -303,23 +288,28 @@
     }
 
     namenode.rollFsImage();
+    checkpointImage.endCheckpoint();
+
+    LOG.warn("Checkpoint done. New Image Size: " 
+              + checkpointImage.getFsImageName().length());
+  }
 
-    LOG.warn("Checkpoint done. Image Size:" + srcImage.length() +
-             " Edit Size:" + editFile.length() +
-             " New Image Size:" + destImage.length());
+  private void startCheckpoint() throws IOException {
+    checkpointImage.unlockAll();
+    checkpointImage.getEditLog().close();
+    checkpointImage.recoverCreate(checkpointDirs);
+    checkpointImage.startCheckpoint();
   }
 
   /**
-   * merges SRC_FS_IMAGE with FS_EDITS and writes the output into
-   * DEST_FS_IMAGE
+   * Merge downloaded image and edits and write the new image into
+   * current storage directory.
    */
-  private void doMerge() throws IOException {
+  private void doMerge(CheckpointSignature sig) throws IOException {
     FSNamesystem namesystem = 
-            new FSNamesystem(new FSImage(checkpointDir), conf);
-    FSImage fsImage = namesystem.dir.fsImage;
-    fsImage.loadFSImage(srcImage);
-    fsImage.getEditLog().loadFSEdits(editFile);
-    fsImage.saveFSImage(destImage);
+            new FSNamesystem(checkpointImage, conf);
+    assert namesystem.dir.fsImage == checkpointImage;
+    checkpointImage.doMerge(sig);
   }
 
   /**
@@ -433,6 +423,13 @@
     }
   }
 
+  static boolean getErrorSimulation(int index) {
+    if(simulation == null)
+      return false;
+    assert(index < simulation.length);
+    return simulation[index];
+  }
+
   void setErrorSimulation(int index) {
     assert(index < simulation.length);
     simulation[index] = true;
@@ -444,38 +441,6 @@
   }
 
   /**
-   * This class is used in Namesystem's jetty to retrieve a file.
-   * Typically used by the Secondary NameNode to retrieve image and
-   * edit file for periodic checkpointing.
-   */
-  public static class GetImageServlet extends HttpServlet {
-    @SuppressWarnings("unchecked")
-    public void doGet(HttpServletRequest request,
-                      HttpServletResponse response
-                      ) throws ServletException, IOException {
-      Map<String,String[]> pmap = request.getParameterMap();
-      try {
-        ServletContext context = getServletContext();
-        SecondaryNameNode nn = (SecondaryNameNode) 
-          context.getAttribute("name.secondary");
-        TransferFsImage ff = new TransferFsImage(pmap, request, response);
-        if (ff.getImage()) {
-          TransferFsImage.getFileServer(response.getOutputStream(),
-                                        nn.getNewImage());
-        }
-        LOG.info("New Image " + nn.getNewImage() + " retrieved by Namenode.");
-      } catch (IOException ie) {
-        StringUtils.stringifyException(ie);
-        LOG.error(ie);
-        String errMsg = "GetImage failed.";
-        response.sendError(HttpServletResponse.SC_GONE, errMsg);
-        throw ie;
-
-      }
-    }
-  }
-
-  /**
    * main() has some simple utility methods.
    * @param argv Command line parameters.
    * @exception Exception if the filesystem does not exist.
@@ -493,4 +458,106 @@
     Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); 
     checkpointThread.start();
   }
+
+  static class CheckpointStorage extends FSImage {
+    /**
+     */
+    CheckpointStorage() throws IOException {
+      super();
+    }
+
+    @Override
+    boolean isConversionNeeded(StorageDirectory sd) {
+      return false;
+    }
+
+    /**
+     * Analyze checkpoint directories.
+     * Create directories if they do not exist.
+     * Recover from an unsuccessful checkpoint is necessary. 
+     * 
+     * @param dataDirs
+     * @throws IOException
+     */
+    void recoverCreate(Collection<File> dataDirs) throws IOException {
+      this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+      for(File dataDir : dataDirs) {
+        dataDir.mkdirs(); // create directories if do not exist yet
+        StorageDirectory sd = new StorageDirectory(dataDir);
+        StorageState curState;
+        try {
+          curState = sd.analyzeStorage(StartupOption.REGULAR);
+          // sd is locked but not opened
+          switch(curState) {
+          case NON_EXISTENT:
+            // fail if any of the configured checkpoint dirs are inaccessible 
+            throw new InconsistentFSStateException(sd.root,
+                  "checkpoint directory does not exist or is not accessible.");
+          case NOT_FORMATTED:
+            break;  // it's ok since initially there is no current and VERSION
+          case CONVERT:
+            throw new InconsistentFSStateException(sd.root,
+                  "not a checkpoint directory.");
+          case NORMAL:
+            break;
+          default:  // recovery is possible
+            sd.doRecover(curState);
+          }
+        } catch (IOException ioe) {
+          sd.unlock();
+          throw ioe;
+        }
+        // add to the storage list
+        addStorageDir(sd);
+        LOG.warn("Checkpoint directory " + sd.root + " is added.");
+      }
+    }
+
+    /**
+     * Prepare directories for a new checkpoint.
+     * <p>
+     * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
+     * and recreate <code>current</code>.
+     * @throws IOException
+     */
+    void startCheckpoint() throws IOException {
+      for(StorageDirectory sd : storageDirs) {
+        File curDir = sd.getCurrentDir();
+        File tmpCkptDir = sd.getLastCheckpointTmp();
+        assert !tmpCkptDir.exists() : 
+          tmpCkptDir.getName() + " directory must not exist.";
+        if(curDir.exists()) {
+          // rename current to tmp
+          rename(curDir, tmpCkptDir);
+        }
+        if (!curDir.mkdir())
+          throw new IOException("Cannot create directory " + curDir);
+      }
+    }
+
+    void endCheckpoint() throws IOException {
+      for(StorageDirectory sd : storageDirs) {
+        File tmpCkptDir = sd.getLastCheckpointTmp();
+        File prevCkptDir = sd.getPreviousCheckpoint();
+        // delete previous dir
+        if (prevCkptDir.exists())
+          deleteDir(prevCkptDir);
+        // rename tmp to previous
+        if (tmpCkptDir.exists())
+          rename(tmpCkptDir, prevCkptDir);
+      }
+    }
+
+    /**
+     * Merge image and edits, and verify consistency with the signature.
+     */
+    private void doMerge(CheckpointSignature sig) throws IOException {
+      getEditLog().open();
+      StorageDirectory sd = getStorageDir(0);
+      loadFSImage(FSImage.getImageFile(sd, NameNodeFile.IMAGE));
+      loadFSEdits(sd);
+      sig.validateStorageInfo(this);
+      saveFSImage();
+    }
+  }
 }
Index: src/java/org/apache/hadoop/dfs/Storage.java
===================================================================
--- src/java/org/apache/hadoop/dfs/Storage.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/Storage.java	(working copy)
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
@@ -57,9 +58,7 @@
   }
   
   StorageInfo(StorageInfo from) {
-    layoutVersion = from.layoutVersion;
-    namespaceID = from.namespaceID;
-    cTime = from.cTime;
+    setStorageInfo(from);
   }
 
   public int    getLayoutVersion(){ return layoutVersion; }
@@ -65,6 +64,12 @@
   public int    getLayoutVersion(){ return layoutVersion; }
   public int    getNamespaceID()  { return namespaceID; }
   public long   getCTime()        { return cTime; }
+
+  public void   setStorageInfo(StorageInfo from) {
+    layoutVersion = from.layoutVersion;
+    namespaceID = from.namespaceID;
+    cTime = from.cTime;
+  }
 }
 
 /**
@@ -100,6 +105,8 @@
   private   static final String STORAGE_TMP_REMOVED   = "removed.tmp";
   private   static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";
   private   static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
+  private   static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
+  private   static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
   
   protected enum StorageState {
     NON_EXISTENT,
@@ -110,6 +117,8 @@
     COMPLETE_FINALIZE,
     COMPLETE_ROLLBACK,
     RECOVER_ROLLBACK,
+    COMPLETE_CHECKPOINT,
+    RECOVER_CHECKPOINT,
     NORMAL;
   }
   
@@ -237,6 +246,12 @@
     File getFinalizedTmp() {
       return new File(root, STORAGE_TMP_FINALIZED);
     }
+    File getLastCheckpointTmp() {
+      return new File(root, STORAGE_TMP_LAST_CKPT);
+    }
+    File getPreviousCheckpoint() {
+      return new File(root, STORAGE_PREVIOUS_CKPT);
+    }
 
     /**
      * Check consistency of the storage directory
@@ -280,7 +295,7 @@
       if (startOpt == StartupOption.FORMAT)
         return StorageState.NOT_FORMATTED;
       // check whether a conversion is required
-      if (isConversionNeeded(this))
+      if (startOpt != StartupOption.IMPORT && isConversionNeeded(this))
         return StorageState.CONVERT;
       // check whether current directory is valid
       File versionFile = getVersionFile();
@@ -291,8 +306,10 @@
       boolean hasPreviousTmp = getPreviousTmp().exists();
       boolean hasRemovedTmp = getRemovedTmp().exists();
       boolean hasFinalizedTmp = getFinalizedTmp().exists();
+      boolean hasCheckpointTmp = getLastCheckpointTmp().exists();
 
-      if (!(hasPreviousTmp || hasRemovedTmp || hasFinalizedTmp)) {
+      if (!(hasPreviousTmp || hasRemovedTmp
+          || hasFinalizedTmp || hasCheckpointTmp)) {
         // no temp dirs - no recovery
         if (hasCurrent)
           return StorageState.NORMAL;
@@ -302,7 +319,8 @@
         return StorageState.NOT_FORMATTED;
       }
 
-      if ((hasPreviousTmp?1:0)+(hasRemovedTmp?1:0)+(hasFinalizedTmp?1:0) > 1)
+      if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
+          + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
         // more than one temp dirs
         throw new InconsistentFSStateException(root,
                                                "too many temporary directories.");
@@ -308,6 +326,11 @@
                                                "too many temporary directories.");
 
       // # of temp dirs == 1 should either recover or complete a transition
+      if (hasCheckpointTmp) {
+        return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
+                          : StorageState.RECOVER_CHECKPOINT;
+      }
+
       if (hasFinalizedTmp) {
         if (hasPrevious)
           throw new InconsistentFSStateException(root,
@@ -375,6 +398,21 @@
                  + rootPath + ".");
         deleteDir(getFinalizedTmp());
         return;
+      case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
+        LOG.info("Completing previous checkpoint for storage directory " 
+                 + rootPath + ".");
+        File prevCkptDir = getPreviousCheckpoint();
+        if (prevCkptDir.exists())
+          deleteDir(prevCkptDir);
+        rename(getLastCheckpointTmp(), prevCkptDir);
+        return;
+      case RECOVER_CHECKPOINT:  // mv lastcheckpoint.tmp -> current
+        LOG.info("Recovering storage directory " + rootPath
+                 + " from failed checkpoint.");
+        if (curDir.exists())
+          deleteDir(curDir);
+        rename(getLastCheckpointTmp(), curDir);
+        return;
       default:
         throw new IOException("Unexpected FS state: " + curState);
       }
@@ -391,6 +429,8 @@
       RandomAccessFile file = new RandomAccessFile(lockF, "rws");
       try {
         this.lock = file.getChannel().tryLock();
+      } catch(OverlappingFileLockException oe) {
+        lock = null;
       } catch(IOException e) {
         LOG.info(StringUtils.stringifyException(e));
         file.close();
Index: src/java/org/apache/hadoop/dfs/TransferFsImage.java
===================================================================
--- src/java/org/apache/hadoop/dfs/TransferFsImage.java	(revision 643793)
+++ src/java/org/apache/hadoop/dfs/TransferFsImage.java	(working copy)
@@ -24,6 +24,8 @@
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.hadoop.dfs.FSImage.CheckpointSignature;
+
 /**
  * This class provides fetching a specified file from the NameNode.
  */
@@ -34,7 +36,7 @@
   private boolean isPutImage;
   private int remoteport;
   private String machineName;
-  private long token;
+  private CheckpointSignature token;
   
   /**
    * File downloader.
@@ -51,7 +53,7 @@
     isGetImage = isGetEdit = isPutImage = false;
     remoteport = 0;
     machineName = null;
-    token = 0;
+    token = null;
 
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -66,12 +68,13 @@
       } else if (key.equals("machine")) { 
         machineName = pmap.get("machine")[0];
       } else if (key.equals("token")) { 
-        token = new Long(pmap.get("token")[0]).longValue();
+        token = new CheckpointSignature(pmap.get("token")[0]);
       }
     }
-    if ((isGetImage && isGetEdit) ||
-        (!isGetImage && !isGetEdit && !isPutImage)) {
-      throw new IOException("No good parameters to TransferFsImage");
+
+    int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
+    if ((numGets > 1) || (numGets == 0) && !isPutImage) {
+      throw new IOException("Illegal parameters to TransferFsImage");
     }
   }
 
@@ -87,7 +90,7 @@
     return isPutImage;
   }
 
-  long getToken() {
+  CheckpointSignature getToken() {
     return token;
   }
 
@@ -108,6 +111,12 @@
     FileInputStream infile = null;
     try {
       infile = new FileInputStream(localfile);
+      if(SecondaryNameNode.getErrorSimulation(2)
+          && localfile.getAbsolutePath().contains("secondary")) {
+        // throw exception only when the secondary sends its image
+        throw new IOException("If this exception is not caught by the " +
+            "name-node fs image will be truncated.");
+      }
       int num = 1;
       while (num > 0) {
         num = infile.read(buf);
@@ -117,7 +126,6 @@
         outstream.write(buf, 0, num);
       }
     } finally {
-      outstream.close();
       if (infile != null) {
         infile.close();
       }
@@ -160,7 +168,7 @@
       }
     } finally {
       stream.close();
-      if (localPath != null) {
+      if (output != null) {
         for (int i = 0; i < output.length; i++) {
           if (output[i] != null) {
             output[i].close();
@@ -169,15 +177,4 @@
       }
     }
   }
-
-  /**
-   * Client-side Method to fetch file from a server
-   * Copies the response from the URL to the local file.
-   */
-  static void getFileClient(String fsName, String id, File localPath)
-    throws IOException {
-    File[] filelist = new File[1];
-    filelist[0] = localPath;
-    getFileClient(fsName, id, filelist);
-  }
 }
Index: src/java/org/apache/hadoop/io/WritableComparable.java
===================================================================
--- src/java/org/apache/hadoop/io/WritableComparable.java	(revision 643793)
+++ src/java/org/apache/hadoop/io/WritableComparable.java	(working copy)
@@ -51,5 +51,5 @@
  *     }
  * </pre></blockquote></p>
  */
-public interface WritableComparable extends Writable, Comparable {
+public interface WritableComparable<T> extends Writable, Comparable<T> {
 }
Index: src/java/org/apache/hadoop/util/StringUtils.java
===================================================================
--- src/java/org/apache/hadoop/util/StringUtils.java	(revision 643793)
+++ src/java/org/apache/hadoop/util/StringUtils.java	(working copy)
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.util;
 
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.InetAddress;
@@ -32,6 +31,7 @@
 import java.util.Date;
 import java.util.List;
 import java.util.StringTokenizer;
+import java.util.Collection;
 
 import org.apache.hadoop.fs.*;
 
@@ -262,7 +262,7 @@
   }
   
   /**
-   * returns an arraylist of strings  
+   * Returns an arraylist of strings.
    * @param str the comma seperated string values
    * @return the arraylist of the comma seperated string values
    */
@@ -267,14 +267,28 @@
    * @return the arraylist of the comma seperated string values
    */
   public static String[] getStrings(String str){
+    Collection<String> values = getStringCollection(str);
+    if(values.size() == 0) {
+      return null;
+    }
+    return values.toArray(new String[values.size()]);
+  }
+
+  /**
+   * Returns a collection of strings.
+   * @param str comma seperated string values
+   * @return an <code>ArrayList</code> of string values
+   */
+  public static Collection<String> getStringCollection(String str){
+    List<String> values = new ArrayList<String>();
     if (str == null)
-      return null;
+      return values;
     StringTokenizer tokenizer = new StringTokenizer (str,",");
-    List<String> values = new ArrayList<String>();
+    values = new ArrayList<String>();
     while (tokenizer.hasMoreTokens()) {
       values.add(tokenizer.nextToken());
     }
-    return values.toArray(new String[values.size()]);
+    return values;
   }
 
   final public static char COMMA = ',';
Index: src/test/org/apache/hadoop/dfs/TestCheckpoint.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestCheckpoint.java	(revision 643793)
+++ src/test/org/apache/hadoop/dfs/TestCheckpoint.java	(working copy)
@@ -20,11 +20,14 @@
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.Collection;
+import java.util.List;
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSImage.NameNodeFile;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -132,11 +135,9 @@
     try {
       assertTrue(!fileSys.exists(file1));
       //
-      // Make the checkpoint fail after rolling the
-      // edit log.
+      // Make the checkpoint fail after rolling the edit log.
       //
       SecondaryNameNode secondary = new SecondaryNameNode(conf);
-      secondary.initializeErrorSimulationEvent(2);
       secondary.setErrorSimulation(0);
 
       try {
@@ -144,6 +145,7 @@
         assertTrue(false);      
       } catch (IOException e) {
       }
+      secondary.clearErrorSimulation(0);
       secondary.shutdown();
 
       //
@@ -173,15 +175,7 @@
       assertFalse(image.getEditNewFile(idx).exists());
       File edits = image.getEditFile(idx);
       assertTrue(edits.exists()); // edits should exist and be empty
-      long editsLen = -1;
-      RandomAccessFile eF = null;
-      try {
-        eF = new RandomAccessFile(edits, "r");
-        editsLen = eF.length();
-      } finally {
-        if(eF != null)
-          eF.close();
-      }
+      long editsLen = edits.length();
       assertTrue(editsLen == Integer.SIZE/Byte.SIZE);
     }
     
@@ -192,6 +186,9 @@
       SecondaryNameNode secondary = new SecondaryNameNode(conf);
       secondary.doCheckpoint();
       secondary.shutdown();
+    } catch(IOException e) {
+      System.out.println("testSecondaryNamenodeError1: faild.");
+      throw e;
     } finally {
       fileSys.close();
       cluster.shutdown();
@@ -215,7 +212,6 @@
       // Make the checkpoint fail after uploading the new fsimage.
       //
       SecondaryNameNode secondary = new SecondaryNameNode(conf);
-      secondary.initializeErrorSimulationEvent(2);
       secondary.setErrorSimulation(1);
 
       try {
@@ -223,6 +219,7 @@
         assertTrue(false);      
       } catch (IOException e) {
       }
+      secondary.clearErrorSimulation(1);
       secondary.shutdown();
 
       //
@@ -273,7 +270,6 @@
       // Make the checkpoint fail after rolling the edit log.
       //
       SecondaryNameNode secondary = new SecondaryNameNode(conf);
-      secondary.initializeErrorSimulationEvent(2);
       secondary.setErrorSimulation(0);
 
       try {
@@ -281,6 +277,7 @@
         assertTrue(false);      
       } catch (IOException e) {
       }
+      secondary.clearErrorSimulation(0);
       secondary.shutdown(); // secondary namenode crash!
 
       // start new instance of secondary and verify that 
@@ -323,6 +320,205 @@
   }
 
   /**
+   * Simulate a secondary node failure to transfer image
+   * back to the name-node.
+   * Used to truncate primary fsimage file.
+   */
+  void testSecondaryFailsToReturnImage(Configuration conf)
+    throws IOException {
+    System.out.println("Starting testSecondaryFailsToReturnImage");
+    Path file1 = new Path("checkpointRI.dat");
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
+                                                false, null);
+    cluster.waitActive();
+    FileSystem fileSys = cluster.getFileSystem();
+    FSImage image = cluster.getNameNode().getFSImage();
+    try {
+      assertTrue(!fileSys.exists(file1));
+      long fsimageLength = image.getImageFile(0, NameNodeFile.IMAGE).length();
+      //
+      // Make the checkpoint
+      //
+      SecondaryNameNode secondary = new SecondaryNameNode(conf);
+      secondary.setErrorSimulation(2);
+
+      try {
+        secondary.doCheckpoint();  // this should fail
+        assertTrue(false);
+      } catch (IOException e) {
+        System.out.println("testSecondaryFailsToReturnImage: doCheckpoint() " +
+            "failed predictably - " + e);
+      }
+      secondary.clearErrorSimulation(2);
+
+      // Verify that image file sizes did not change.
+      int nrDirs = image.getNumStorageDirs();
+      for(int idx = 0; idx < nrDirs; idx++) {
+        assertTrue(image.getImageFile(idx, 
+                                NameNodeFile.IMAGE).length() == fsimageLength);
+      }
+
+      secondary.shutdown();
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test different startup scenarios.
+   * <p><ol>
+   * <li> Start of primary name-node in secondary directory must succeed. 
+   * <li> Start of secondary node when the primary is already running in 
+   *      this directory must fail.
+   * <li> Start of primary name-node if secondary node is already running in 
+   *      this directory must fail.
+   * <li> Start of two secondary nodes in the same directory must fail.
+   * <li> Import of a checkpoint must fail if primary 
+   * directory contains a valid image.
+   * <li> Import of the secondary image directory must succeed if primary 
+   * directory does not exist.
+   * <li> Recover failed checkpoint for secondary node.
+   * <li> Complete failed checkpoint for secondary node.
+   * </ol>
+   */
+  void testStartup(Configuration conf) throws IOException {
+    System.out.println("Startup of the name-node in the checkpoint directory.");
+    String primaryDirs = conf.get("dfs.name.dir");
+    String checkpointDirs = conf.get("fs.checkpoint.dir");
+    conf.set("dfs.http.address", "0.0.0.0:0");  
+    conf.set("dfs.name.dir", checkpointDirs);
+    String[] args = new String[]{};
+    NameNode nn = NameNode.createNameNode(args, conf);
+    assertTrue(nn.isInSafeMode());
+
+    // Starting secondary node in the same directory as the primary
+    System.out.println("Startup of secondary in the same dir as the primary.");
+    SecondaryNameNode secondary = null;
+    try {
+      conf.set("dfs.secondary.http.address", "0.0.0.0:0");
+      secondary = new SecondaryNameNode(conf);
+      assertTrue(false);      
+    } catch (IOException e) { // expected to fail
+      assertTrue(secondary == null);
+    }
+    nn.stop(); nn = null;
+
+    // Starting primary node in the same directory as the secondary
+    System.out.println("Startup of primary in the same dir as the secondary.");
+    conf.set("dfs.http.address", "0.0.0.0:0");  
+    conf.set("dfs.name.dir", primaryDirs);
+    nn = NameNode.createNameNode(args, conf);
+    conf.set("dfs.secondary.http.address", "0.0.0.0:0");
+    boolean succeed = false;
+    do {
+      try {
+        secondary = new SecondaryNameNode(conf);
+        succeed = true;
+      } catch(IOException ie) { // keep trying
+        System.out.println("Try again: " + ie.getLocalizedMessage());
+      }
+    } while(!succeed);
+    nn.stop(); nn = null;
+    try {
+      conf.set("dfs.http.address", "0.0.0.0:0");  
+      conf.set("dfs.name.dir", checkpointDirs);
+      nn = NameNode.createNameNode(args, conf);
+      assertTrue(false);      
+    } catch (IOException e) { // expected to fail
+      assertTrue(nn == null);
+    }
+
+    // Try another secondary in the same directory
+    System.out.println("Startup of two secondaries in the same dir.");
+    conf.set("dfs.http.address", "0.0.0.0:0");  
+    conf.set("dfs.name.dir", primaryDirs);
+    // secondary won't start without primary.
+    nn = NameNode.createNameNode(args, conf);
+    SecondaryNameNode secondary2 = null;
+    try {
+      conf.set("dfs.secondary.http.address", "0.0.0.0:0");
+      secondary2 = new SecondaryNameNode(conf);
+      assertTrue(false);      
+    } catch (IOException e) { // expected to fail
+      assertTrue(secondary2 == null);
+    }
+    nn.stop(); nn = null;
+    secondary.shutdown();
+
+    // Import a checkpoint with existing primary image.
+    System.out.println("Import a checkpoint with existing primary image.");
+    args = new String[]{StartupOption.IMPORT.getName()};
+    try {
+      conf.set("dfs.http.address", "0.0.0.0:0");  
+      conf.set("dfs.name.dir", primaryDirs);
+      nn = NameNode.createNameNode(args, conf);
+      assertTrue(false);      
+    } catch (IOException e) { // expected to fail
+      assertTrue(nn == null);
+    }
+
+    // Remove current image and import a checkpoint.
+    System.out.println("Import a checkpoint with existing primary image.");
+    List<File> nameDirs = (List<File>)FSNamesystem.getNamespaceDirs(conf);
+    long fsimageLength = new File(new File(nameDirs.get(0), "current"), 
+                                        NameNodeFile.IMAGE.getName()).length();
+    for(File dir : nameDirs) {
+      if(dir.exists())
+        if(!(FileUtil.fullyDelete(dir)))
+          throw new IOException("Cannot remove directory: " + dir);
+      if (!dir.mkdirs())
+        throw new IOException("Cannot create directory " + dir);
+    }
+    conf.set("dfs.http.address", "0.0.0.0:0");  
+    conf.set("dfs.name.dir", primaryDirs);
+    nn = NameNode.createNameNode(args, conf);
+    // Verify that image file sizes did not change.
+    FSImage image = nn.getFSImage();
+    int nrDirs = image.getNumStorageDirs();
+    for(int idx = 0; idx < nrDirs; idx++) {
+      assertTrue(image.getImageFile(idx, 
+                              NameNodeFile.IMAGE).length() == fsimageLength);
+    }
+    nn.stop();
+
+    // recover failed checkpoint
+    conf.set("dfs.name.dir", primaryDirs);
+    args = new String[]{};
+    nn = NameNode.createNameNode(args, conf);
+    Collection<File> secondaryDirs = FSImage.getCheckpointDirs(conf, null);
+    for(File dir : secondaryDirs) {
+      Storage.rename(new File(dir, "current"), 
+                     new File(dir, "lastcheckpoint.tmp"));
+    }
+    secondary = new SecondaryNameNode(conf);
+    secondary.shutdown();
+    for(File dir : secondaryDirs) {
+      assertTrue(new File(dir, "current").exists()); 
+      assertFalse(new File(dir, "lastcheckpoint.tmp").exists());
+    }
+    
+    // complete failed checkpoint
+    for(File dir : secondaryDirs) {
+      Storage.rename(new File(dir, "previous.checkpoint"), 
+                     new File(dir, "lastcheckpoint.tmp"));
+    }
+    secondary = new SecondaryNameNode(conf);
+    secondary.shutdown();
+    for(File dir : secondaryDirs) {
+      assertTrue(new File(dir, "current").exists()); 
+      assertTrue(new File(dir, "previous.checkpoint").exists()); 
+      assertFalse(new File(dir, "lastcheckpoint.tmp").exists());
+    }
+    nn.stop(); nn = null;
+    
+    // Check that everything starts ok now.
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
+    cluster.waitActive();
+    cluster.shutdown();
+  }
+
+  /**
    * Tests checkpoint in DFS.
    */
   public void testCheckpoint() throws IOException {
@@ -355,6 +551,7 @@
       // Take a checkpoint
       //
       SecondaryNameNode secondary = new SecondaryNameNode(conf);
+      secondary.initializeErrorSimulationEvent(3);
       secondary.doCheckpoint();
       secondary.shutdown();
     } finally {
@@ -412,5 +609,7 @@
     testSecondaryNamenodeError2(conf);
     testSecondaryNamenodeError3(conf);
     testNamedirError(conf, namedirs);
+    testSecondaryFailsToReturnImage(conf);
+    testStartup(conf);
   }
 }
