Index: src/test/org/apache/hadoop/fs/TestHarFileSystem.java
===================================================================
--- src/test/org/apache/hadoop/fs/TestHarFileSystem.java	(revision 664154)
+++ src/test/org/apache/hadoop/fs/TestHarFileSystem.java	(working copy)
@@ -41,7 +41,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.util.HadoopArchives;
+import org.apache.hadoop.tools.HadoopArchives;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;
@@ -196,4 +196,4 @@
     assertTrue("number of bytes left should be -1", reduceIn.read(b) == -1);
     reduceIn.close();
   }
-}
\ No newline at end of file
+}
Index: src/test/org/apache/hadoop/fs/TestCopyFiles.java
===================================================================
--- src/test/org/apache/hadoop/fs/TestCopyFiles.java	(revision 664154)
+++ src/test/org/apache/hadoop/fs/TestCopyFiles.java	(working copy)
@@ -29,7 +29,7 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.util.CopyFiles;
+import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.util.ToolRunner;
 
 
@@ -224,7 +224,7 @@
   /** copy files from local file system to local file system */
   public void testCopyFromLocalToLocal() throws Exception {
     MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
-    ToolRunner.run(new CopyFiles(new Configuration()),
+    ToolRunner.run(new DistCp(new Configuration()),
                            new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
                                          "file:///"+TEST_ROOT_DIR+"/destdat"});
     assertTrue("Source and destination directories do not match.",
@@ -243,7 +243,7 @@
       namenode = FileSystem.getDefaultUri(conf).toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {
+        ToolRunner.run(new DistCp(conf), new String[] {
                                          "-log",
                                          namenode+"/logs",
                                          namenode+"/srcdat",
@@ -272,7 +272,7 @@
       namenode = FileSystem.getDefaultUri(conf).toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {
+        ToolRunner.run(new DistCp(conf), new String[] {
                                          "-log",
                                          namenode+"/logs",
                                          "file:///"+TEST_ROOT_DIR+"/srcdat",
@@ -301,7 +301,7 @@
       namenode = FileSystem.getDefaultUri(conf).toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {
+        ToolRunner.run(new DistCp(conf), new String[] {
                                          "-log",
                                          "/logs",
                                          namenode+"/srcdat",
@@ -329,7 +329,7 @@
       namenode = FileSystem.getDefaultUri(conf).toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {
+        ToolRunner.run(new DistCp(conf), new String[] {
                                          "-p",
                                          "-log",
                                          namenode+"/logs",
@@ -346,7 +346,7 @@
         updateFiles(namenode, "/srcdat", files, nupdate);
         deldir(namenode, "/logs");
 
-        ToolRunner.run(new CopyFiles(conf), new String[] {
+        ToolRunner.run(new DistCp(conf), new String[] {
                                          "-p",
                                          "-update",
                                          "-log",
@@ -359,7 +359,7 @@
                  checkUpdate(dchkpoint, namenode, "/destdat", files, nupdate));
 
         deldir(namenode, "/logs");
-        ToolRunner.run(new CopyFiles(conf), new String[] {
+        ToolRunner.run(new DistCp(conf), new String[] {
                                          "-p",
                                          "-overwrite",
                                          "-log",
@@ -383,14 +383,14 @@
   public void testCopyDuplication() throws Exception {
     try {    
       MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
-      ToolRunner.run(new CopyFiles(new Configuration()),
+      ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
                         "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
       assertTrue("Source and destination directories do not match.",
                  checkFiles("file:///", TEST_ROOT_DIR+"/src2/srcdat", files));
   
-      assertEquals(CopyFiles.DuplicationException.ERROR_CODE,
-          ToolRunner.run(new CopyFiles(new Configuration()),
+      assertEquals(DistCp.DuplicationException.ERROR_CODE,
+          ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
                         "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
                         "file:///"+TEST_ROOT_DIR+"/destdat",}));
@@ -408,7 +408,7 @@
     try {    
       MyFile[] files = {createFile(root, fs)};
       //copy a dir with a single file
-      ToolRunner.run(new CopyFiles(new Configuration()),
+      ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
                         "file:///"+TEST_ROOT_DIR+"/destdat"});
       assertTrue("Source and destination directories do not match.",
@@ -418,7 +418,7 @@
       String fname = files[0].getName();
       Path p = new Path(root, fname);
       FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
-      ToolRunner.run(new CopyFiles(new Configuration()),
+      ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
                         "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
       assertTrue("Source and destination directories do not match.",
@@ -428,7 +428,7 @@
       fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2"));
       MyFile[] files2 = {createFile(root, fs, 0)};
       String sname = files2[0].getName();
-      ToolRunner.run(new CopyFiles(new Configuration()),
+      ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"-update",
                         "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
                         "file:///"+TEST_ROOT_DIR+"/dest2/"});
@@ -436,7 +436,7 @@
           checkFiles("file:///", TEST_ROOT_DIR+"/dest2", files2));     
       updateFiles("file:///", TEST_ROOT_DIR+"/srcdat", files2, 1);
       //copy single file to existing dir w/ dst name conflict
-      ToolRunner.run(new CopyFiles(new Configuration()),
+      ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"-update",
                         "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
                         "file:///"+TEST_ROOT_DIR+"/dest2/"});
@@ -464,7 +464,7 @@
         for(int i = 0; i < srcstat.length; i++) {
           fs.setOwner(srcstat[i].getPath(), "u" + i, null);
         }
-        ToolRunner.run(new CopyFiles(conf),
+        ToolRunner.run(new DistCp(conf),
             new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(nnUri, "/destdat", files));
@@ -483,7 +483,7 @@
         for(int i = 0; i < srcstat.length; i++) {
           fs.setOwner(srcstat[i].getPath(), null, "g" + i);
         }
-        ToolRunner.run(new CopyFiles(conf),
+        ToolRunner.run(new DistCp(conf),
             new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(nnUri, "/destdat", files));
@@ -505,7 +505,7 @@
           fs.setPermission(srcstat[i].getPath(), permissions[i]);
         }
 
-        ToolRunner.run(new CopyFiles(conf),
+        ToolRunner.run(new DistCp(conf),
             new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(nnUri, "/destdat", files));
@@ -539,7 +539,7 @@
       }
       JobConf job = mr.createJobConf();
       job.setLong("distcp.bytes.per.map", totsize / 3);
-      ToolRunner.run(new CopyFiles(job),
+      ToolRunner.run(new DistCp(job),
           new String[] {"-m", "100",
                         "-log",
                         namenode+"/logs",
@@ -553,7 +553,7 @@
 
       deldir(namenode, "/destdat");
       deldir(namenode, "/logs");
-      ToolRunner.run(new CopyFiles(job),
+      ToolRunner.run(new DistCp(job),
           new String[] {"-m", "1",
                         "-log",
                         namenode+"/logs",
Index: src/tools/org/apache/hadoop/tools/HadoopArchives.java
===================================================================
--- src/tools/org/apache/hadoop/tools/HadoopArchives.java	(revision 0)
+++ src/tools/org/apache/hadoop/tools/HadoopArchives.java	(working copy)
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.util;
+package org.apache.hadoop.tools;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -56,8 +56,9 @@
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
-
 /**
  * a archive creation utility.
  * This class provides methods that can be used 
@@ -310,7 +311,7 @@
     Path outputPath = new Path(dest, archiveName);
     FileOutputFormat.setOutputPath(conf, outputPath);
     conf.set(DST_DIR_LABEL, outputPath.toString());
-    final String randomId = CopyFiles.getRandomId();
+    final String randomId = DistCp.getRandomId();
     Path jobDirectory = new Path(new JobClient().getSystemDir(),
                           NAME + "_" + randomId);
     conf.set(JOB_DIR_LABEL, jobDirectory.toString());
Index: src/tools/org/apache/hadoop/tools/DistCp.java
===================================================================
--- src/tools/org/apache/hadoop/tools/DistCp.java	(revision 0)
+++ src/tools/org/apache/hadoop/tools/DistCp.java	(working copy)
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.util;
+package org.apache.hadoop.tools;
 
 import java.io.BufferedReader;
 import java.io.DataInput;
@@ -57,13 +57,16 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * A Map-reduce program to recursively copy directories between
  * different file-systems.
  */
-public class CopyFiles implements Tool {
-  private static final Log LOG = LogFactory.getLog(CopyFiles.class);
+public class DistCp implements Tool {
+  private static final Log LOG = LogFactory.getLog(DistCp.class);
 
   private static final String NAME = "distcp";
 
@@ -168,7 +171,7 @@
     return conf;
   }
 
-  public CopyFiles(Configuration conf) {
+  public DistCp(Configuration conf) {
     setConf(conf);
   }
 
@@ -452,7 +455,7 @@
     private void updatePermissions(FileStatus src, FileStatus dst
         ) throws IOException {
       if (preserve_status) {
-        CopyFiles.updatePermissions(src, dst, preseved, destFileSys);
+        DistCp.updatePermissions(src, dst, preseved, destFileSys);
       }
     }
 
@@ -776,8 +779,8 @@
   }
 
   public static void main(String[] args) throws Exception {
-    JobConf job = new JobConf(CopyFiles.class);
-    CopyFiles distcp = new CopyFiles(job);
+    JobConf job = new JobConf(DistCp.class);
+    DistCp distcp = new DistCp(job);
     int res = ToolRunner.run(distcp, args);
     System.exit(res);
   }
@@ -839,7 +842,7 @@
 
   //Job configuration
   private static JobConf createJobConf(Configuration conf) {
-    JobConf jobconf = new JobConf(conf, CopyFiles.class);
+    JobConf jobconf = new JobConf(conf, DistCp.class);
     jobconf.setJobName(NAME);
 
     // turn off speculative execution, because DFS doesn't handle
Index: src/tools/org/apache/hadoop/tools/Logalyzer.java
===================================================================
--- src/tools/org/apache/hadoop/tools/Logalyzer.java	(revision 0)
+++ src/tools/org/apache/hadoop/tools/Logalyzer.java	(working copy)
@@ -46,7 +46,6 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
-import org.apache.hadoop.util.CopyFiles;
 
 /**
  * Logalyzer: A utility tool for archiving and analyzing hadoop logs.
@@ -184,7 +183,7 @@
     throws IOException
   {
     String destURL = FileSystem.getDefaultUri(fsConfig) + archiveDirectory;
-    CopyFiles.copy(fsConfig, logListURI, destURL, null, true, false);
+    DistCp.copy(new JobConf(fsConfig), logListURI, destURL, null, true, false);
   }
   
   /**
Index: src/java/org/apache/hadoop/tools/Logalyzer.java
===================================================================
--- src/java/org/apache/hadoop/tools/Logalyzer.java	(revision 664154)
+++ src/java/org/apache/hadoop/tools/Logalyzer.java	(working copy)
@@ -1,313 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.tools;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.LongSumReducer;
-import org.apache.hadoop.util.CopyFiles;
-
-/**
- * Logalyzer: A utility tool for archiving and analyzing hadoop logs.
- * <p>
- * This tool supports archiving and anaylzing (sort/grep) of log-files.
- * It takes as input
- *  a) Input uri which will serve uris of the logs to be archived.
- *  b) Output directory (not mandatory).
- *  b) Directory on dfs to archive the logs. 
- *  c) The sort/grep patterns for analyzing the files and separator for boundaries.
- * Usage: 
- * Logalyzer -archive -archiveDir <directory to archive logs> -analysis <directory> -logs <log-list uri> -grep <pattern> -sort <col1, col2> -separator <separator>   
- * <p>
- */
-
-public class Logalyzer {
-  // Constants
-  private static Configuration fsConfig = new Configuration();
-  
-  /** A {@link Mapper} that extracts text matching a regular expression. */
-  public static class LogRegexMapper<K extends WritableComparable>
-    extends MapReduceBase
-    implements Mapper<K, Text, Text, LongWritable> {
-    
-    private Pattern pattern;
-    
-    public void configure(JobConf job) {
-      pattern = Pattern.compile(job.get("mapred.mapper.regex"));
-    }
-    
-    public void map(K key, Text value,
-                    OutputCollector<Text, LongWritable> output,
-                    Reporter reporter)
-      throws IOException {
-      String text = value.toString();
-      Matcher matcher = pattern.matcher(text);
-      while (matcher.find()) {
-        output.collect(value, new LongWritable(1));
-      }
-    }
-    
-  }
-  
-  /** A WritableComparator optimized for UTF8 keys of the logs. */
-  public static class LogComparator extends Text.Comparator implements Configurable {
-    
-    private static Log LOG = LogFactory.getLog("org.apache.hadoop.tools.Logalyzer");
-    private JobConf conf = null;
-    private String[] sortSpec = null;
-    private String columnSeparator = null;
-    
-    public void setConf(Configuration conf) {
-      if (conf instanceof JobConf) {
-        this.conf = (JobConf) conf;
-      } else {
-        this.conf = new JobConf(conf);
-      }
-      
-      //Initialize the specification for *comparision*
-      String sortColumns = this.conf.get("mapred.reducer.sort", null);
-      if (sortColumns != null) {
-        sortSpec = sortColumns.split(",");
-      }
-      
-      //Column-separator
-      columnSeparator = this.conf.get("mapred.reducer.separator", "");
-    }
-    
-    public Configuration getConf() {
-      return conf;
-    }
-    
-    public int compare(byte[] b1, int s1, int l1,
-                       byte[] b2, int s2, int l2) {
-      
-      if (sortSpec == null) {
-        return super.compare(b1, s1, l1, b2, s2, l2);
-      }
-      
-      try {
-        Text logline1 = new Text(); 
-        logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
-        String line1 = logline1.toString();
-        String[] logColumns1 = line1.split(columnSeparator);
-        
-        Text logline2 = new Text(); 
-        logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
-        String line2 = logline2.toString();
-        String[] logColumns2 = line2.split(columnSeparator);
-        
-        if (logColumns1 == null || logColumns2 == null) {
-          return super.compare(b1, s1, l1, b2, s2, l2);
-        }
-        
-        //Compare column-wise according to *sortSpec*
-        for(int i=0; i < sortSpec.length; ++i) {
-          int column = (Integer.valueOf(sortSpec[i]).intValue());
-          String c1 = logColumns1[column]; 
-          String c2 = logColumns2[column];
-          
-          //Compare columns
-          int comparision = super.compareBytes(
-                                               c1.getBytes(), 0, c1.length(),
-                                               c2.getBytes(), 0, c2.length()
-                                               );
-          
-          //They differ!
-          if (comparision != 0) {
-            return comparision;
-          }
-        }
-        
-      } catch (IOException ioe) {
-        LOG.fatal("Caught " + ioe);
-        return 0;
-      }
-      
-      return 0;
-    }
-    
-    static {                                        
-      // register this comparator
-      WritableComparator.define(Text.class, new LogComparator());
-    }
-  }
-  
-  /**
-   * doArchive: Workhorse function to archive log-files.
-   * @param logListURI : The uri which will serve list of log-files to archive.
-   * @param archiveDirectory : The directory to store archived logfiles.
-   * @throws IOException
-   */
-  public void	
-    doArchive(String logListURI, String archiveDirectory)
-    throws IOException
-  {
-    String destURL = FileSystem.getDefaultUri(fsConfig) + archiveDirectory;
-    CopyFiles.copy(fsConfig, logListURI, destURL, null, true, false);
-  }
-  
-  /**
-   * doAnalyze: 
-   * @param inputFilesDirectory : Directory containing the files to be analyzed.
-   * @param outputDirectory : Directory to store analysis (output).
-   * @param grepPattern : Pattern to *grep* for.
-   * @param sortColumns : Sort specification for output.
-   * @param columnSeparator : Column separator.
-   * @throws IOException
-   */
-  public void
-    doAnalyze(String inputFilesDirectory, String outputDirectory,
-              String grepPattern, String sortColumns, String columnSeparator)
-    throws IOException
-  {		
-    Path grepInput = new Path(inputFilesDirectory);
-    
-    Path analysisOutput = null;
-    if (outputDirectory.equals("")) {
-      analysisOutput =  new Path(inputFilesDirectory, "logalyzer_" + 
-                                 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-    } else {
-      analysisOutput = new Path(outputDirectory);
-    }
-    
-    JobConf grepJob = new JobConf(fsConfig);
-    grepJob.setJobName("logalyzer-grep-sort");
-    
-    FileInputFormat.setInputPaths(grepJob, grepInput);
-    grepJob.setInputFormat(TextInputFormat.class);
-    
-    grepJob.setMapperClass(LogRegexMapper.class);
-    grepJob.set("mapred.mapper.regex", grepPattern);
-    grepJob.set("mapred.reducer.sort", sortColumns);
-    grepJob.set("mapred.reducer.separator", columnSeparator);
-    
-    grepJob.setCombinerClass(LongSumReducer.class);
-    grepJob.setReducerClass(LongSumReducer.class);
-    
-    FileOutputFormat.setOutputPath(grepJob, analysisOutput);
-    grepJob.setOutputFormat(TextOutputFormat.class);
-    grepJob.setOutputKeyClass(Text.class);
-    grepJob.setOutputValueClass(LongWritable.class);
-    grepJob.setOutputKeyComparatorClass(LogComparator.class);
-    
-    grepJob.setNumReduceTasks(1);                 // write a single file
-    
-    JobClient.runJob(grepJob);
-  }
-  
-  public static void main(String[] args) {
-    
-    Log LOG = LogFactory.getLog("org.apache.hadoop.tools.Logalyzer");
-    
-    String version = "Logalyzer.0.0.1";
-    String usage = "Usage: Logalyzer [-archive -logs <urlsFile>] " +
-      "-archiveDir <archiveDirectory> " +
-      "-grep <pattern> -sort <column1,column2,...> -separator <separator> " +
-      "-analysis <outputDirectory>";
-    
-    System.out.println(version);
-    if (args.length == 0) {
-      System.err.println(usage);
-      System.exit(-1);
-    }
-    
-    //Command line arguments
-    boolean archive = false;
-    boolean grep = false;
-    boolean sort = false;
-    
-    String archiveDir = "";
-    String logListURI = "";
-    String grepPattern = ".*";
-    String sortColumns = "";
-    String columnSeparator = " ";
-    String outputDirectory = "";
-    
-    for (int i = 0; i < args.length; i++) { // parse command line
-      if (args[i].equals("-archive")) {
-        archive = true;
-      } else if (args[i].equals("-archiveDir")) {
-        archiveDir = args[++i];
-      } else if (args[i].equals("-grep")) {
-        grep = true;
-        grepPattern = args[++i];
-      } else if (args[i].equals("-logs")) {
-        logListURI = args[++i];
-      } else if (args[i].equals("-sort")) {
-        sort = true;
-        sortColumns = args[++i];
-      } else if (args[i].equals("-separator")) {
-        columnSeparator = args[++i];
-      } else if (args[i].equals("-analysis")) {
-        outputDirectory = args[++i];
-      }
-    }
-    
-    LOG.info("analysisDir = " + outputDirectory);
-    LOG.info("archiveDir = " + archiveDir);
-    LOG.info("logListURI = " + logListURI);
-    LOG.info("grepPattern = " + grepPattern);
-    LOG.info("sortColumns = " + sortColumns);
-    LOG.info("separator = " + columnSeparator);
-    
-    try {
-      Logalyzer logalyzer = new Logalyzer();
-      
-      // Archive?
-      if (archive) {
-        logalyzer.doArchive(logListURI, archiveDir);
-      }
-      
-      // Analyze?
-      if (grep || sort) {
-        logalyzer.doAnalyze(archiveDir, outputDirectory, grepPattern, sortColumns, columnSeparator);
-      }
-    } catch (IOException ioe) {
-      ioe.printStackTrace();
-      System.exit(-1);
-    }
-    
-  } //main
-  
-} //class Logalyzer
Index: src/java/org/apache/hadoop/util/HadoopArchives.java
===================================================================
--- src/java/org/apache/hadoop/util/HadoopArchives.java	(revision 664154)
+++ src/java/org/apache/hadoop/util/HadoopArchives.java	(working copy)
@@ -1,667 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.util;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.HarFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
-
-/**
- * a archive creation utility.
- * This class provides methods that can be used 
- * to create hadoop archives. For understanding of 
- * Hadoop archives look at {@link HarFileSystem}.
- */
-public class HadoopArchives implements Tool {
-  private static final Log LOG = LogFactory.getLog(HadoopArchives.class);
-  
-  private static final String NAME = "har"; 
-  static final String SRC_LIST_LABEL = NAME + ".src.list";
-  static final String DST_DIR_LABEL = NAME + ".dest.path";
-  static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
-  static final String JOB_DIR_LABEL = NAME + ".job.dir";
-  static final String SRC_COUNT_LABEL = NAME + ".src.count";
-  static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
-  static final String DST_HAR_LABEL = NAME + ".archive.name";
-  // size of each part file
-  // its fixed for now.
-  static final long partSize = 2 * 1024 * 1024 * 1024;
-
-  private static final String usage = "archive"
-  + " -archiveName NAME <src>* <dest>" +
-  "\n";
-  
- 
-  private JobConf conf;
-
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
-  }
-
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  public HadoopArchives(Configuration conf) {
-    setConf(conf);
-  }
-
-  // check the src paths
-  private static void checkPaths(Configuration conf, List<Path> paths) throws
-  IOException {
-    for (Path p : paths) {
-      FileSystem fs = p.getFileSystem(conf);
-      if (!fs.exists(p)) {
-        throw new FileNotFoundException("Source " + p + " does not exist.");
-      }
-    }
-  }
-
-  /**
-   * this assumes that there are two types of files file/dir
-   * @param fs the input filesystem
-   * @param p the top level path 
-   * @param out the list of paths output of recursive ls
-   * @throws IOException
-   */
-  private void recursivels(FileSystem fs, Path p, List<FileStatus> out) 
-  throws IOException {
-    FileStatus fstatus = fs.getFileStatus(p);
-    if (!fstatus.isDir()) {
-      out.add(fstatus);
-      return;
-    }
-    else {
-      out.add(fstatus);
-      FileStatus[] listStatus = fs.listStatus(p);
-      for (FileStatus stat: listStatus) {
-        recursivels(fs, stat.getPath(), out);
-      }
-    }
-  }
-
-  /**
-   * Input format of a hadoop archive job responsible for 
-   * generating splits of the file list
-   */
-
-  static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
-    public void validateInput(JobConf jconf) throws IOException{};
-
-    //generate input splits from the src file lists
-    public InputSplit[] getSplits(JobConf jconf, int numSplits)
-    throws IOException {
-      String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
-      if ("".equals(srcfilelist)) {
-          throw new IOException("Unable to get the " +
-              "src file for archive generation.");
-      }
-      long totalSize = jconf.getLong(TOTAL_SIZE_LABEL, -1);
-      if (totalSize == -1) {
-        throw new IOException("Invalid size of files to archive");
-      }
-      //we should be safe since this is set by our own code
-      Path src = new Path(srcfilelist);
-      FileSystem fs = src.getFileSystem(jconf);
-      FileStatus fstatus = fs.getFileStatus(src);
-      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
-      LongWritable key = new LongWritable();
-      Text value = new Text();
-      SequenceFile.Reader reader = null;
-      // the remaining bytes in the file split
-      long remaining = fstatus.getLen();
-      // the count of sizes calculated till now
-      long currentCount = 0L;
-      // the endposition of the split
-      long lastPos = 0L;
-      // the start position of the split
-      long startPos = 0L;
-      long targetSize = totalSize/numSplits;
-      // create splits of size target size so that all the maps 
-      // have equals sized data to read and write to.
-      try {
-        reader = new SequenceFile.Reader(fs, src, jconf);
-        while(reader.next(key, value)) {
-          if (currentCount + key.get() > targetSize && currentCount != 0){
-            long size = lastPos - startPos;
-            splits.add(new FileSplit(src, startPos, size, (String[]) null));
-            remaining = remaining - size;
-            startPos = lastPos;
-            currentCount = 0L;
-          }
-          currentCount += key.get();
-          lastPos = reader.getPosition();
-        }
-        // the remaining not equal to the target size.
-        if (remaining != 0) {
-          splits.add(new FileSplit(src, startPos, remaining, (String[])null));
-        }
-      }
-      finally { 
-        reader.close();
-      }
-      return splits.toArray(new FileSplit[splits.size()]);
-    }
-
-    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
-        JobConf job, Reporter reporter) throws IOException {
-      return new SequenceFileRecordReader<LongWritable, Text>(job,
-                 (FileSplit)split);
-    }
-  }
-
-  private boolean checkValidName(String name) {
-    if (name.endsWith(".har")) 
-      return true;
-    return false;
-  }
-  
-
-  private Path largestDepth(List<Path> paths) {
-    Path deepest = paths.get(0);
-    for (Path p: paths) {
-      if (p.depth() > deepest.depth()) {
-        deepest = p;
-      }
-    }
-    return deepest;
-  }
-  
-  // this method is tricky. This method writes 
-  // the top level directories in such a way so that 
-  // the output only contains valid directoreis in archives.
-  // so for an input path specified by the user 
-  // as /user/hadoop
-  // we need to index 
-  // / as the root 
-  // /user as a directory
-  // /user/hadoop as a directory
-  // so for multiple input paths it makes sure that it
-  // does the right thing.
-  // so if the user specifies the input directories as 
-  // /user/harry and /user/hadoop
-  // we need to write / and user as its child
-  // and /user and harry and hadoop as its children
-  private void writeTopLevelDirs(SequenceFile.Writer srcWriter, 
-      List<Path> paths) throws IOException {
-    //these are qualified paths 
-    List<Path> justDirs = new ArrayList<Path>();
-    for (Path p: paths) {
-      if (!p.getFileSystem(getConf()).isFile(p)) {
-        justDirs.add(new Path(p.toUri().getPath()));
-      }
-      else {
-        justDirs.add(new Path(p.getParent().toUri().getPath()));
-      }
-    }
-    
-    //get the largest depth path
-    // this is tricky
-    TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
-    Path deepest = largestDepth(paths);
-    Path root = new Path(Path.SEPARATOR);
-    for (int i = 0; i < deepest.depth(); i++) {
-      List<Path> parents = new ArrayList<Path>();
-      for (Path p: justDirs) {
-        if (p.compareTo(root) == 0){
-          //don nothing
-        }
-        else {
-          Path parent = p.getParent();
-          if (allpaths.containsKey(parent.toString())) {
-            HashSet<String> children = allpaths.get(parent.toString());
-            children.add(p.getName());
-          }
-          else {
-            HashSet<String> children = new HashSet<String>();
-            children.add(p.getName());
-            allpaths.put(parent.toString(), children);
-          }
-          parents.add(parent);
-        }
-      }
-      justDirs = parents;
-    }
-    Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
-    for (Map.Entry<String, HashSet<String>> entry : keyVals) {
-      HashSet<String> children = entry.getValue();
-      String toWrite = entry.getKey() + " dir ";
-      StringBuffer sbuff = new StringBuffer();
-      sbuff.append(toWrite);
-      for (String child: children) {
-        sbuff.append(child + " ");
-      }
-      toWrite = sbuff.toString();
-      srcWriter.append(new LongWritable(0L), new Text(toWrite));
-    }
-  }
-  
-  /**archive the given source paths into
-   * the dest
-   * @param srcPaths the src paths to be archived
-   * @param dest the dest dir that will contain the archive
-   */
-  public void archive(List<Path> srcPaths, String archiveName, Path dest) 
-  throws IOException {
-    boolean isValid = checkValidName(archiveName);
-    if (!isValid) { 
-      throw new IOException("Invalid archiveName " + archiveName);
-    }
-    checkPaths(conf, srcPaths);
-    int numFiles = 0;
-    long totalSize = 0;
-    conf.set(DST_HAR_LABEL, archiveName);
-    Path outputPath = new Path(dest, archiveName);
-    FileOutputFormat.setOutputPath(conf, outputPath);
-    conf.set(DST_DIR_LABEL, outputPath.toString());
-    final String randomId = CopyFiles.getRandomId();
-    Path jobDirectory = new Path(new JobClient().getSystemDir(),
-                          NAME + "_" + randomId);
-    conf.set(JOB_DIR_LABEL, jobDirectory.toString());
-    //get a tmp directory for input splits
-    FileSystem jobfs = jobDirectory.getFileSystem(conf);
-    jobfs.mkdirs(jobDirectory);
-    Path srcFiles = new Path(jobDirectory, "_har_src_files");
-    conf.set(SRC_LIST_LABEL, srcFiles.toString());
-    SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
-        srcFiles, LongWritable.class, Text.class, 
-        SequenceFile.CompressionType.NONE);
-    // get the list of files 
-    // create single list of files and dirs
-    try {
-      // write the top level dirs in first 
-      writeTopLevelDirs(srcWriter, srcPaths);
-      // these are the input paths passed 
-      // from the command line
-      // we do a recursive ls on these paths 
-      // and then write them to the input file 
-      // one at a time
-      for (Path src: srcPaths) {
-        FileSystem fs = src.getFileSystem(conf);
-        ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
-        recursivels(fs, src, allFiles);
-        for (FileStatus stat: allFiles) {
-          String toWrite = "";
-          long len = stat.isDir()? 0:stat.getLen();
-          if (stat.isDir()) {
-            toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
-            //get the children 
-            FileStatus[] list = fs.listStatus(stat.getPath());
-            StringBuffer sbuff = new StringBuffer();
-            sbuff.append(toWrite);
-            for (FileStatus stats: list) {
-              sbuff.append(stats.getPath().getName() + " ");
-            }
-            toWrite = sbuff.toString();
-          }
-          else {
-            toWrite +=  fs.makeQualified(stat.getPath()) + " file ";
-          }
-          srcWriter.append(new LongWritable(len), new 
-              Text(toWrite));
-          numFiles++;
-          totalSize += len;
-        }
-      }
-    } finally {
-      srcWriter.close();
-    }
-    //increase the replication of src files
-    jobfs.setReplication(srcFiles, (short) 10);
-    conf.setInt(SRC_COUNT_LABEL, numFiles);
-    conf.setLong(TOTAL_SIZE_LABEL, totalSize);
-    int numMaps = (int)(totalSize/partSize);
-    //run atleast one map.
-    conf.setNumMapTasks(numMaps == 0? 1:numMaps);
-    conf.setNumReduceTasks(1);
-    conf.setInputFormat(HArchiveInputFormat.class);
-    conf.setOutputFormat(NullOutputFormat.class);
-    conf.setMapperClass(HArchivesMapper.class);
-    conf.setReducerClass(HArchivesReducer.class);
-    conf.setMapOutputKeyClass(IntWritable.class);
-    conf.setMapOutputValueClass(Text.class);
-    FileInputFormat.addInputPath(conf, jobDirectory);
-    //make sure no speculative execution is done
-    conf.setSpeculativeExecution(false);
-    JobClient.runJob(conf);
-    //delete the tmp job directory
-    try {
-      jobfs.delete(jobDirectory, true);
-    } catch(IOException ie) {
-      LOG.info("Unable to clean tmp directory " + jobDirectory);
-    }
-  }
-
-  static class HArchivesMapper 
-  implements Mapper<LongWritable, Text, IntWritable, Text> {
-    private JobConf conf = null;
-    int partId = -1 ; 
-    Path tmpOutputDir = null;
-    Path tmpOutput = null;
-    String partname = null;
-    FSDataOutputStream partStream = null;
-    FileSystem destFs = null;
-    byte[] buffer;
-    int buf_size = 128 * 1024;
-    
-    // configure the mapper and create 
-    // the part file.
-    // use map reduce framework to write into
-    // tmp files. 
-    public void configure(JobConf conf) {
-      this.conf = conf;
-      // this is tightly tied to map reduce
-      // since it does not expose an api 
-      // to get the partition
-      partId = conf.getInt("mapred.task.partition", -1);
-      // create a file name using the partition
-      // we need to write to this directory
-      tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
-      // get the output path and write to the tmp 
-      // directory 
-      partname = "part-" + partId;
-      tmpOutput = new Path(tmpOutputDir, partname);
-      try {
-        destFs = tmpOutput.getFileSystem(conf);
-        //this was a stale copy
-        if (destFs.exists(tmpOutput)) {
-          destFs.delete(tmpOutput, false);
-        }
-        partStream = destFs.create(tmpOutput);
-      } catch(IOException ie) {
-        throw new RuntimeException("Unable to open output file " + tmpOutput);
-      }
-      buffer = new byte[buf_size];
-    }
-
-    // copy raw data.
-    public void copyData(Path input, FSDataInputStream fsin, 
-        FSDataOutputStream fout, Reporter reporter) throws IOException {
-      try {
-        for (int cbread=0; (cbread = fsin.read(buffer))>= 0;) {
-          fout.write(buffer, 0,cbread);
-          reporter.progress();
-        }
-      } finally {
-        fsin.close();
-      }
-    }
-    
-    // the relative path of p. basically 
-    // getting rid of schema. Parsing and doing 
-    // string manipulation is not good - so
-    // just use the path api to do it.
-    private Path makeRelative(Path p) {
-      Path retPath = new Path(p.toUri().getPath());
-      return retPath;
-    }
-    
-    static class MapStat {
-      private String pathname;
-      private boolean isDir;
-      private List<String> children;
-      public MapStat(String line) {
-        String[] splits = line.split(" ");
-        pathname = splits[0];
-        if ("dir".equals(splits[1])) {
-          isDir = true;
-        }
-        else {
-          isDir = false;
-        }
-        if (isDir) {
-          children = new ArrayList<String>();
-          for (int i = 2; i < splits.length; i++) {
-            children.add(splits[i]);
-          }
-        }
-      }
-    }
-    // read files from the split input 
-    // and write it onto the part files.
-    // also output hash(name) and string 
-    // for reducer to create index 
-    // and masterindex files.
-    public void map(LongWritable key, Text value,
-        OutputCollector<IntWritable, Text> out,
-        Reporter reporter) throws IOException {
-      String line  = value.toString();
-      MapStat mstat = new MapStat(line);
-      Path srcPath = new Path(mstat.pathname);
-      String towrite = null;
-      Path relPath = makeRelative(srcPath);
-      int hash = HarFileSystem.getHarHash(relPath);
-      long startPos = partStream.getPos();
-      if (mstat.isDir) { 
-        towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
-        StringBuffer sbuff = new StringBuffer();
-        sbuff.append(towrite);
-        for (String child: mstat.children) {
-          sbuff.append(child + " ");
-        }
-        towrite = sbuff.toString();
-        //reading directories is also progress
-        reporter.progress();
-      }
-      else {
-        FileSystem srcFs = srcPath.getFileSystem(conf);
-        FileStatus srcStatus = srcFs.getFileStatus(srcPath);
-        FSDataInputStream input = srcFs.open(srcStatus.getPath());
-        reporter.setStatus("Copying file " + srcStatus.getPath() + 
-            " to archive.");
-        copyData(srcStatus.getPath(), input, partStream, reporter);
-        towrite = relPath.toString() + " file " + partname + " " + startPos
-        + " " + srcStatus.getLen() + " ";
-      }
-      out.collect(new IntWritable(hash), new Text(towrite));
-    }
-    
-    public void close() throws IOException {
-      // close the part files.
-      partStream.close();
-    }
-  }
-  
-  /** the reduce for creating the index and the master index 
-   * 
-   */
-  static class HArchivesReducer implements Reducer<IntWritable, 
-  Text, Text, Text> {
-    private JobConf conf = null;
-    private long startIndex = 0;
-    private long endIndex = 0;
-    private long startPos = 0;
-    private Path masterIndex = null;
-    private Path index = null;
-    private FileSystem fs = null;
-    private FSDataOutputStream outStream = null;
-    private FSDataOutputStream indexStream = null;
-    private int numIndexes = 1000;
-    private Path tmpOutputDir = null;
-    private int written = 0;
-    private int keyVal = 0;
-    
-    // configure 
-    public void configure(JobConf conf) {
-      this.conf = conf;
-      tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
-      masterIndex = new Path(tmpOutputDir, "_masterindex");
-      index = new Path(tmpOutputDir, "_index");
-      try {
-        fs = masterIndex.getFileSystem(conf);
-        if (fs.exists(masterIndex)) {
-          fs.delete(masterIndex, false);
-        }
-        if (fs.exists(index)) {
-          fs.delete(index, false);
-        }
-        indexStream = fs.create(index);
-        outStream = fs.create(masterIndex);
-        String version = HarFileSystem.VERSION + " \n";
-        outStream.write(version.getBytes());
-        
-      } catch(IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    
-    // create the index and master index. The input to 
-    // the reduce is already sorted by the hash of the 
-    // files. SO we just need to write it to the index. 
-    // We update the masterindex as soon as we update 
-    // numIndex entries.
-    public void reduce(IntWritable key, Iterator<Text> values,
-        OutputCollector<Text, Text> out,
-        Reporter reporter) throws IOException {
-      keyVal = key.get();
-      while(values.hasNext()) {
-        Text value = values.next();
-        String towrite = value.toString() + "\n";
-        indexStream.write(towrite.getBytes());
-        written++;
-        if (written > numIndexes -1) {
-          // every 1000 indexes we report status
-          reporter.setStatus("Creating index for archives");
-          reporter.progress();
-          endIndex = keyVal;
-          String masterWrite = startIndex + " " + endIndex + " " + startPos 
-                              +  " " + indexStream.getPos() + " \n" ;
-          outStream.write(masterWrite.getBytes());
-          startPos = indexStream.getPos();
-          startIndex = endIndex;
-          written = 0;
-        }
-      }
-    }
-    
-    public void close() throws IOException {
-      //write the last part of the master index.
-      if (written > 0) {
-        String masterWrite = startIndex + " " + keyVal + " " + startPos  +
-                             " " + indexStream.getPos() + " \n";
-        outStream.write(masterWrite.getBytes());
-      }
-      // close the streams
-      outStream.close();
-      indexStream.close();
-      // try increasing the replication 
-      fs.setReplication(index, (short) 10);
-      fs.setReplication(masterIndex, (short) 10);
-    }
-    
-  }
-  
-  /** the main driver for creating the archives
-   *  it takes at least two command line parameters. The src and the 
-   *  dest. It does an lsr on the source paths.
-   *  The mapper created archuves and the reducer creates 
-   *  the archive index.
-   */
-
-  public int run(String[] args) throws Exception {
-    List<Path> srcPaths = new ArrayList<Path>();
-    Path destPath = null;
-    // check we were supposed to archive or 
-    // unarchive
-    String archiveName = null;
-    if (args.length < 2) {
-      System.out.println(usage);
-      throw new IOException("Invalid usage.");
-    }
-    if (!"-archiveName".equals(args[0])) {
-      System.out.println(usage);
-      throw new IOException("Archive Name not specified.");
-    }
-    archiveName = args[1];
-    if (!checkValidName(archiveName)) {
-      throw new IOException("Invalid name for archives. " + archiveName);
-    }
-    for (int i = 2; i < args.length; i++) {
-      if (i == (args.length - 1)) {
-        destPath = new Path(args[i]);
-      }
-      else {
-        srcPaths.add(new Path(args[i]));
-      }
-    }
-    // do a glob on the srcPaths and then pass it on
-    List<Path> globPaths = new ArrayList<Path>();
-    for (Path p: srcPaths) {
-      FileSystem fs = p.getFileSystem(getConf());
-      FileStatus[] statuses = fs.globStatus(p);
-      for (FileStatus status: statuses) {
-        globPaths.add(fs.makeQualified(status.getPath()));
-      }
-    }
-    archive(globPaths, archiveName, destPath);
-    return 0;
-  }
-
-  /** the main functions **/
-  public static void main(String[] args) {
-    JobConf job = new JobConf(HadoopArchives.class);
-    HadoopArchives harchives = new HadoopArchives(job);
-    try {
-      int res = harchives.run(args);
-      System.exit(res);
-    } catch(Exception e) {
-      System.err.println(e.getLocalizedMessage());
-    }
-  }
-}
Index: src/java/org/apache/hadoop/util/CopyFiles_Counter.properties
===================================================================
--- src/java/org/apache/hadoop/util/CopyFiles_Counter.properties	(revision 664154)
+++ src/java/org/apache/hadoop/util/CopyFiles_Counter.properties	(working copy)
@@ -1,9 +0,0 @@
-# ResourceBundle properties file for distcp counters
-
-CounterGroupName=       distcp
-
-COPY.name=              Files copied
-SKIP.name=              Files skipped
-FAIL.name=              Files failed
-BYTESCOPIED.name=       Bytes copied
-BYTESEXPECTED.name=     Bytes expected
Index: src/java/org/apache/hadoop/util/CopyFiles.java
===================================================================
--- src/java/org/apache/hadoop/util/CopyFiles.java	(revision 664154)
+++ src/java/org/apache/hadoop/util/CopyFiles.java	(working copy)
@@ -1,1067 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.util;
-
-import java.io.BufferedReader;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Stack;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InvalidInputException;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-
-/**
- * A Map-reduce program to recursively copy directories between
- * different file-systems.
- */
-public class CopyFiles implements Tool {
-  private static final Log LOG = LogFactory.getLog(CopyFiles.class);
-
-  private static final String NAME = "distcp";
-
-  private static final String usage = NAME
-    + " [OPTIONS] <srcurl>* <desturl>" +
-    "\n\nOPTIONS:" +
-    "\n-p[rbugp]              Preserve status" +
-    "\n                       r: replication number" +
-    "\n                       b: block size" +
-    "\n                       u: user" + 
-    "\n                       g: group" +
-    "\n                       p: permission" +
-    "\n                       -p alone is equivalent to -prbugp" +
-    "\n-i                     Ignore failures" +
-    "\n-log <logdir>          Write logs to <logdir>" +
-    "\n-m <num_maps>          Maximum number of simultaneous copies" +
-    "\n-overwrite             Overwrite destination" +
-    "\n-update                Overwrite if src size different from dst size" +
-    "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
-    "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
-    "\n      interpreted as an isomorphic update to an existing directory." +
-    "\nFor example:" +
-    "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
-    "\"hdfs://B:8020/user/foo/baz\"\n" +
-    "\n     would update all descendants of 'baz' also in 'bar'; it would " +
-    "\n     *not* update /user/foo/baz/bar\n";
-
-  private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
-  private static final int MAX_MAPS_PER_NODE = 20;
-  private static final int SYNC_FILE_MAX = 10;
-
-  static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
-  static enum Options {
-    IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
-    PRESERVE_STATUS("-p", NAME + ".preserve.status"),
-    OVERWRITE("-overwrite", NAME + ".overwrite.always"),
-    UPDATE("-update", NAME + ".overwrite.ifnewer");
-
-    final String cmd, propertyname;
-
-    private Options(String cmd, String propertyname) {
-      this.cmd = cmd;
-      this.propertyname = propertyname;
-    }
-  }
-  static enum FileAttribute {
-    BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
-
-    final char symbol;
-
-    private FileAttribute() {symbol = toString().toLowerCase().charAt(0);}
-    
-    static EnumSet<FileAttribute> parse(String s) {
-      if (s == null || s.length() == 0) {
-        return EnumSet.allOf(FileAttribute.class);
-      }
-
-      EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class);
-      FileAttribute[] attributes = values();
-      for(char c : s.toCharArray()) {
-        int i = 0;
-        for(; i < attributes.length && c != attributes[i].symbol; i++);
-        if (i < attributes.length) {
-          if (!set.contains(attributes[i])) {
-            set.add(attributes[i]);
-          } else {
-            throw new IllegalArgumentException("There are more than one '"
-                + attributes[i].symbol + "' in " + s); 
-          }
-        } else {
-          throw new IllegalArgumentException("'" + c + "' in " + s
-              + " is undefined.");
-        }
-      }
-      return set;
-    }
-  }
-
-  static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
-  static final String DST_DIR_LABEL = NAME + ".dest.path";
-  static final String JOB_DIR_LABEL = NAME + ".job.dir";
-  static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
-  static final String SRC_LIST_LABEL = NAME + ".src.list";
-  static final String SRC_COUNT_LABEL = NAME + ".src.count";
-  static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
-  static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
-  static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
-  static final String PRESERVE_STATUS_LABEL
-      = Options.PRESERVE_STATUS.propertyname + ".value";
-
-  private JobConf conf;
-
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public CopyFiles(Configuration conf) {
-    setConf(conf);
-  }
-
-  /**
-   * An input/output pair of filenames.
-   */
-  static class FilePair implements Writable {
-    FileStatus input = new FileStatus();
-    String output;
-    FilePair() { }
-    FilePair(FileStatus input, String output) {
-      this.input = input;
-      this.output = output;
-    }
-    public void readFields(DataInput in) throws IOException {
-      input.readFields(in);
-      output = Text.readString(in);
-    }
-    public void write(DataOutput out) throws IOException {
-      input.write(out);
-      Text.writeString(out, output);
-    }
-    public String toString() {
-      return input + " : " + output;
-    }
-  }
-
-  /**
-   * InputFormat of a distcp job responsible for generating splits of the src
-   * file list.
-   */
-  static class CopyInputFormat implements InputFormat<Text, Text> {
-
-    /**
-     * Does nothing.
-     */
-    public void validateInput(JobConf job) throws IOException { }
-
-    /**
-     * Produce splits such that each is no greater than the quotient of the
-     * total size and the number of splits requested.
-     * @param job The handle to the JobConf object
-     * @param numSplits Number of splits requested
-     */
-    public InputSplit[] getSplits(JobConf job, int numSplits)
-        throws IOException {
-      int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
-      long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
-      String srcfilelist = job.get(SRC_LIST_LABEL, "");
-      if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
-        throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
-                                   ") total_size(" + cbsize + ") listuri(" +
-                                   srcfilelist + ")");
-      }
-      Path src = new Path(srcfilelist);
-      FileSystem fs = src.getFileSystem(job);
-      FileStatus srcst = fs.getFileStatus(src);
-
-      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
-      LongWritable key = new LongWritable();
-      FilePair value = new FilePair();
-      final long targetsize = cbsize / numSplits;
-      long pos = 0L;
-      long last = 0L;
-      long acc = 0L;
-      long cbrem = srcst.getLen();
-      SequenceFile.Reader sl = null;
-      try {
-        sl = new SequenceFile.Reader(fs, src, job);
-        for (; sl.next(key, value); last = sl.getPosition()) {
-          // if adding this split would put this split past the target size,
-          // cut the last split and put this next file in the next split.
-          if (acc + key.get() > targetsize && acc != 0) {
-            long splitsize = last - pos;
-            splits.add(new FileSplit(src, pos, splitsize, (String[])null));
-            cbrem -= splitsize;
-            pos = last;
-            acc = 0L;
-          }
-          acc += key.get();
-        }
-      }
-      finally {
-        checkAndClose(sl);
-      }
-      if (cbrem != 0) {
-        splits.add(new FileSplit(src, pos, cbrem, (String[])null));
-      }
-
-      return splits.toArray(new FileSplit[splits.size()]);
-    }
-
-    /**
-     * Returns a reader for this split of the src file list.
-     */
-    public RecordReader<Text, Text> getRecordReader(InputSplit split,
-        JobConf job, Reporter reporter) throws IOException {
-      return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
-    }
-  }
-
-  /**
-   * FSCopyFilesMapper: The mapper for copying files between FileSystems.
-   */
-  static class CopyFilesMapper
-      implements Mapper<LongWritable, FilePair, WritableComparable, Text> {
-    // config
-    private int sizeBuf = 128 * 1024;
-    private FileSystem destFileSys = null;
-    private boolean ignoreReadFailures;
-    private boolean preserve_status;
-    private EnumSet<FileAttribute> preseved;
-    private boolean overwrite;
-    private boolean update;
-    private Path destPath = null;
-    private byte[] buffer = null;
-    private JobConf job;
-
-    // stats
-    private int failcount = 0;
-    private int skipcount = 0;
-    private int copycount = 0;
-
-    private String getCountString() {
-      return "Copied: " + copycount + " Skipped: " + skipcount
-          + " Failed: " + failcount;
-    }
-    private void updateStatus(Reporter reporter) {
-      reporter.setStatus(getCountString());
-    }
-
-    /**
-     * Return true if dst should be replaced by src and the update flag is set.
-     * Right now, this merely checks that the src and dst len are not equal. 
-     * This should be improved on once modification times, CRCs, etc. can
-     * be meaningful in this context.
-     */
-    private boolean needsUpdate(FileStatus src, FileStatus dst) {
-      return update && src.getLen() != dst.getLen();
-    }
-    
-    private FSDataOutputStream create(Path f, Reporter reporter,
-        FileStatus srcstat) throws IOException {
-      if (!preserve_status) {
-        return destFileSys.create(f, reporter);
-      }
-
-      FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
-          srcstat.getPermission(): null;
-      short replication = preseved.contains(FileAttribute.REPLICATION)?
-          srcstat.getReplication(): destFileSys.getDefaultReplication();
-      long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
-          srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
-      return destFileSys.create(f, permission, true, sizeBuf, replication,
-          blockSize, reporter);
-    }
-
-    /**
-     * Copy a file to a destination.
-     * @param srcstat src path and metadata
-     * @param dstpath dst path
-     * @param reporter
-     */
-    private void copy(FileStatus srcstat, Path relativedst,
-        OutputCollector<WritableComparable, Text> outc, Reporter reporter)
-        throws IOException {
-      Path absdst = new Path(destPath, relativedst);
-      int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
-      assert totfiles >= 0 : "Invalid file count " + totfiles;
-
-      // if a directory, ensure created even if empty
-      if (srcstat.isDir()) {
-        if (destFileSys.exists(absdst)) {
-          if (!destFileSys.getFileStatus(absdst).isDir()) {
-            throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
-          }
-        }
-        else if (!destFileSys.mkdirs(absdst)) {
-          throw new IOException("Failed to mkdirs " + absdst);
-        }
-        // TODO: when modification times can be set, directories should be
-        // emitted to reducers so they might be preserved. Also, mkdirs does
-        // not currently return an error when the directory already exists;
-        // if this changes, all directory work might as well be done in reduce
-        return;
-      }
-
-      if (destFileSys.exists(absdst) && !overwrite
-          && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
-        outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
-        ++skipcount;
-        reporter.incrCounter(Counter.SKIP, 1);
-        updateStatus(reporter);
-        return;
-      }
-
-      Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
-      long cbcopied = 0L;
-      FSDataInputStream in = null;
-      FSDataOutputStream out = null;
-      try {
-        // open src file
-        in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
-        reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
-        // open tmp file
-        out = create(tmpfile, reporter, srcstat);
-        // copy file
-        for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
-          out.write(buffer, 0, cbread);
-          cbcopied += cbread;
-          reporter.setStatus(
-              String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
-              + absdst + " [ " +
-              StringUtils.humanReadableInt(cbcopied) + " / " +
-              StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
-        }
-      } finally {
-        checkAndClose(in);
-        checkAndClose(out);
-      }
-
-      if (cbcopied != srcstat.getLen()) {
-        throw new IOException("File size not matched: copied "
-            + bytesString(cbcopied) + " to tmpfile (=" + tmpfile
-            + ") but expected " + bytesString(srcstat.getLen()) 
-            + " from " + srcstat.getPath());        
-      }
-      else {
-        if (totfiles == 1) {
-          // Copying a single file; use dst path provided by user as destination
-          // rather than destination directory, if a file
-          Path dstparent = absdst.getParent();
-          if (!(destFileSys.exists(dstparent) &&
-                destFileSys.getFileStatus(dstparent).isDir())) {
-            absdst = dstparent;
-          }
-        }
-        if (destFileSys.exists(absdst) &&
-            destFileSys.getFileStatus(absdst).isDir()) {
-          throw new IOException(absdst + " is a directory");
-        }
-        if (!destFileSys.mkdirs(absdst.getParent())) {
-          throw new IOException("Failed to craete parent dir: " + absdst.getParent());
-        }
-        rename(tmpfile, absdst);
-
-        FileStatus dststat = destFileSys.getFileStatus(absdst);
-        if (dststat.getLen() != srcstat.getLen()) {
-          destFileSys.delete(absdst, false);
-          throw new IOException("File size not matched: copied "
-              + bytesString(dststat.getLen()) + " to dst (=" + absdst
-              + ") but expected " + bytesString(srcstat.getLen()) 
-              + " from " + srcstat.getPath());        
-        } 
-        updatePermissions(srcstat, dststat);
-      }
-
-      // report at least once for each file
-      ++copycount;
-      reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
-      reporter.incrCounter(Counter.COPY, 1);
-      updateStatus(reporter);
-    }
-    
-    /** rename tmp to dst, delete dst if already exists */
-    private void rename(Path tmp, Path dst) throws IOException {
-      try {
-        if (destFileSys.exists(dst)) {
-          destFileSys.delete(dst, true);
-        }
-        if (!destFileSys.rename(tmp, dst)) {
-          throw new IOException();
-        }
-      }
-      catch(IOException cause) {
-        throw (IOException)new IOException("Fail to rename tmp file (=" + tmp 
-            + ") to destination file (=" + dst + ")").initCause(cause);
-      }
-    }
-
-    private void updatePermissions(FileStatus src, FileStatus dst
-        ) throws IOException {
-      if (preserve_status) {
-        CopyFiles.updatePermissions(src, dst, preseved, destFileSys);
-      }
-    }
-
-    static String bytesString(long b) {
-      return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
-    }
-
-    /** Mapper configuration.
-     * Extracts source and destination file system, as well as
-     * top-level paths on source and destination directories.
-     * Gets the named file systems, to be used later in map.
-     */
-    public void configure(JobConf job)
-    {
-      destPath = new Path(job.get(DST_DIR_LABEL, "/"));
-      try {
-        destFileSys = destPath.getFileSystem(job);
-      } catch (IOException ex) {
-        throw new RuntimeException("Unable to get the named file system.", ex);
-      }
-      sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
-      buffer = new byte[sizeBuf];
-      ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
-      preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
-      if (preserve_status) {
-        preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL));
-      }
-      update = job.getBoolean(Options.UPDATE.propertyname, false);
-      overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
-      this.job = job;
-    }
-
-    /** Map method. Copies one file from source file system to destination.
-     * @param key src len
-     * @param value FilePair (FileStatus src, Path dst)
-     * @param out Log of failed copies
-     * @param reporter
-     */
-    public void map(LongWritable key,
-                    FilePair value,
-                    OutputCollector<WritableComparable, Text> out,
-                    Reporter reporter) throws IOException {
-      FileStatus srcstat = value.input;
-      Path dstpath = new Path(value.output);
-      try {
-        copy(srcstat, dstpath, out, reporter);
-      } catch (IOException e) {
-        ++failcount;
-        reporter.incrCounter(Counter.FAIL, 1);
-        updateStatus(reporter);
-        final String sfailure = "FAIL " + dstpath + " : " +
-                          StringUtils.stringifyException(e);
-        out.collect(null, new Text(sfailure));
-        LOG.info(sfailure);
-        try {
-          for (int i = 0; i < 3; ++i) {
-            try {
-              if (destFileSys.delete(dstpath, true))
-                break;
-            } catch (Throwable ex) {
-              // ignore, we are just cleaning up
-              LOG.debug("Ignoring cleanup exception", ex);
-            }
-            // update status, so we don't get timed out
-            updateStatus(reporter);
-            Thread.sleep(3 * 1000);
-          }
-        } catch (InterruptedException inte) {
-          throw (IOException)new IOException().initCause(inte);
-        }
-      } finally {
-        updateStatus(reporter);
-      }
-    }
-
-    public void close() throws IOException {
-      if (0 == failcount || ignoreReadFailures) {
-        return;
-      }
-      throw new IOException(getCountString());
-    }
-  }
-
-  private static List<Path> fetchFileList(Configuration conf, Path srcList)
-      throws IOException {
-    List<Path> result = new ArrayList<Path>();
-    FileSystem fs = srcList.getFileSystem(conf);
-    BufferedReader input = null;
-    try {
-      input = new BufferedReader(new InputStreamReader(fs.open(srcList)));
-      String line = input.readLine();
-      while (line != null) {
-        result.add(new Path(line));
-        line = input.readLine();
-      }
-    } finally {
-      checkAndClose(input);
-    }
-    return result;
-  }
-
-  @Deprecated
-  public static void copy(Configuration conf, String srcPath,
-                          String destPath, Path logPath,
-                          boolean srcAsList, boolean ignoreReadFailures)
-      throws IOException {
-    final Path src = new Path(srcPath);
-    List<Path> tmp = new ArrayList<Path>();
-    if (srcAsList) {
-      tmp.addAll(fetchFileList(conf, src));
-    } else {
-      tmp.add(src);
-    }
-    EnumSet<Options> flags = ignoreReadFailures
-      ? EnumSet.of(Options.IGNORE_READ_FAILURES)
-      : EnumSet.noneOf(Options.class);
-    copy(conf, tmp, new Path(destPath), logPath, flags, null);
-  }
-
-  /** Sanity check for srcPath */
-  private static void checkSrcPath(Configuration conf, List<Path> srcPaths
-      ) throws IOException {
-    List<IOException> rslt = new ArrayList<IOException>();
-    for (Path p : srcPaths) {
-      FileSystem fs = p.getFileSystem(conf);
-      if (!fs.exists(p)) {
-        rslt.add(new IOException("Input source " + p + " does not exist."));
-      }
-    }
-    if (!rslt.isEmpty()) {
-      throw new InvalidInputException(rslt);
-    }
-  }
-
-  /**
-   * Driver to copy srcPath to destPath depending on required protocol.
-   * @param srcPaths list of source paths
-   * @param destPath Destination path
-   * @param logPath Log output directory
-   * @param flags Command-line flags
-   */
-  static void copy(Configuration conf, List<Path> srcPaths,
-      Path destPath, Path logPath, EnumSet<Options> flags,
-      String presevedAttributes) throws IOException {
-    LOG.info("srcPaths=" + srcPaths);
-    LOG.info("destPath=" + destPath);
-    checkSrcPath(conf, srcPaths);
-
-    JobConf job = createJobConf(conf);
-    if (presevedAttributes != null) {
-      job.set(PRESERVE_STATUS_LABEL, presevedAttributes);
-    }
-    
-    //Initialize the mapper
-    try {
-      setup(conf, job, srcPaths, destPath, logPath, flags);
-      JobClient.runJob(job);
-      finalize(conf, job, destPath, presevedAttributes);
-    } finally {
-      //delete tmp
-      fullyDelete(job.get(TMP_DIR_LABEL), job);
-      //delete jobDirectory
-      fullyDelete(job.get(JOB_DIR_LABEL), job);
-    }
-  }
-
-  private static void updatePermissions(FileStatus src, FileStatus dst,
-      EnumSet<FileAttribute> preseved, FileSystem destFileSys
-      ) throws IOException {
-    String owner = null;
-    String group = null;
-    if (preseved.contains(FileAttribute.USER)
-        && !src.getOwner().equals(dst.getOwner())) {
-      owner = src.getOwner();
-    }
-    if (preseved.contains(FileAttribute.GROUP)
-        && !src.getGroup().equals(dst.getGroup())) {
-      group = src.getGroup();
-    }
-    if (owner != null || group != null) {
-      destFileSys.setOwner(dst.getPath(), owner, group);
-    }
-    if (preseved.contains(FileAttribute.PERMISSION)
-        && !src.getPermission().equals(dst.getPermission())) {
-      destFileSys.setPermission(dst.getPath(), src.getPermission());
-    }
-  }
-
-  static private void finalize(Configuration conf, JobConf jobconf,
-      final Path destPath, String presevedAttributes) throws IOException {
-    if (presevedAttributes == null) {
-      return;
-    }
-    EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
-    if (!preseved.contains(FileAttribute.USER)
-        && !preseved.contains(FileAttribute.GROUP)
-        && !preseved.contains(FileAttribute.PERMISSION)) {
-      return;
-    }
-
-    FileSystem dstfs = destPath.getFileSystem(conf);
-    Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
-    SequenceFile.Reader in = null;
-    try {
-      in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
-          dstdirlist, jobconf);
-      Text dsttext = new Text();
-      FilePair pair = new FilePair(); 
-      for(; in.next(dsttext, pair); ) {
-        Path absdst = new Path(destPath, pair.output);
-        updatePermissions(pair.input, dstfs.getFileStatus(absdst),
-            preseved, dstfs);
-      }
-    } finally {
-      checkAndClose(in);
-    }
-  }
-
-  static private class CommandArgument {
-    final List<Path> srcs;
-    final Path dst;
-    final Path log;
-    final EnumSet<Options> flags;
-    final String presevedAttributes;
-    
-    CommandArgument(List<Path> srcs, Path dst, Path log,
-        EnumSet<Options> flags, String presevedAttributes) {
-      this.srcs = srcs;
-      this.dst = dst;
-      this.log = log;
-      this.flags = flags;
-      this.presevedAttributes = presevedAttributes;      
-    }
-
-    static CommandArgument valueOf(String[] args, Configuration conf
-        ) throws IOException {
-      List<Path> srcs = new ArrayList<Path>();
-      Path dst = null;
-      Path log = null;
-      EnumSet<Options> flags = EnumSet.noneOf(Options.class);
-      String presevedAttributes = null;
-
-      for (int idx = 0; idx < args.length; idx++) {
-        Options[] opt = Options.values();
-        int i = 0;
-        for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++);
-
-        if (i < opt.length) {
-          flags.add(opt[i]);
-          if (opt[i] == Options.PRESERVE_STATUS) {
-            presevedAttributes =  args[idx].substring(2);         
-            FileAttribute.parse(presevedAttributes); //validation
-          }
-        } else if ("-f".equals(args[idx])) {
-          if (++idx ==  args.length) {
-            throw new IllegalArgumentException("urilist_uri not specified in -f");
-          }
-          srcs.addAll(fetchFileList(conf, new Path(args[idx])));
-        } else if ("-log".equals(args[idx])) {
-          if (++idx ==  args.length) {
-            throw new IllegalArgumentException("logdir not specified in -log");
-          }
-          log = new Path(args[idx]);
-        } else if ("-m".equals(args[idx])) {
-          if (++idx == args.length) {
-            throw new IllegalArgumentException("num_maps not specified in -m");
-          }
-          try {
-            conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx]));
-          } catch (NumberFormatException e) {
-            throw new IllegalArgumentException("Invalid argument to -m: " +
-                                               args[idx]);
-          }
-        } else if ('-' == args[idx].codePointAt(0)) {
-          throw new IllegalArgumentException("Invalid switch " + args[idx]);
-        } else if (idx == args.length -1) {
-          dst = new Path(args[idx]);
-        } else {
-          srcs.add(new Path(args[idx]));
-        }
-      }
-      // mandatory command-line parameters
-      if (srcs.isEmpty() || dst == null) {
-        throw new IllegalArgumentException("Missing "
-            + (dst == null ? "dst path" : "src"));
-      }
-      // incompatible command-line flags
-      if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
-        throw new IllegalArgumentException("Conflicting overwrite policies");
-      }
-      return new CommandArgument(srcs, dst, log, flags, presevedAttributes);
-    }
-  }
-
-  /**
-   * This is the main driver for recursively copying directories
-   * across file systems. It takes at least two cmdline parameters. A source
-   * URL and a destination URL. It then essentially does an "ls -lR" on the
-   * source URL, and writes the output in a round-robin manner to all the map
-   * input files. The mapper actually copies the files allotted to it. The
-   * reduce is empty.
-   */
-  public int run(String[] args) throws Exception {
-    try {
-      CommandArgument p = CommandArgument.valueOf(args, conf);
-      copy(conf, p.srcs, p.dst, p.log, p.flags, p.presevedAttributes);
-      return 0;
-    } catch (IllegalArgumentException e) {
-      System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
-      ToolRunner.printGenericCommandUsage(System.err);
-      return -1;
-    } catch (DuplicationException e) {
-      System.err.println(StringUtils.stringifyException(e));
-      return DuplicationException.ERROR_CODE;
-    } catch (Exception e) {
-      System.err.println("With failures, global counters are inaccurate; " +
-          "consider running with -i");
-      System.err.println("Copy failed: " + StringUtils.stringifyException(e));
-      return -999;
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    JobConf job = new JobConf(CopyFiles.class);
-    CopyFiles distcp = new CopyFiles(job);
-    int res = ToolRunner.run(distcp, args);
-    System.exit(res);
-  }
-
-  /**
-   * Make a path relative with respect to a root path.
-   * absPath is always assumed to descend from root.
-   * Otherwise returned path is null.
-   */
-  static String makeRelative(Path root, Path absPath) {
-    if (!absPath.isAbsolute()) {
-      throw new IllegalArgumentException("!absPath.isAbsolute(), absPath="
-          + absPath);
-    }
-    String p = absPath.toUri().getPath();
-
-    StringTokenizer pathTokens = new StringTokenizer(p, "/");
-    for(StringTokenizer rootTokens = new StringTokenizer(
-        root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) {
-      if (!rootTokens.nextToken().equals(pathTokens.nextToken())) {
-        return null;
-      }
-    }
-    StringBuilder sb = new StringBuilder();
-    for(; pathTokens.hasMoreTokens(); ) {
-      sb.append(pathTokens.nextToken());
-      if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); }
-    }
-    return sb.length() == 0? ".": sb.toString();
-  }
-
-  /**
-   * Calculate how many maps to run.
-   * Number of maps is bounded by a minimum of the cumulative size of the
-   * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
-   * command line) and at most (distcp.max.map.tasks, default
-   * MAX_MAPS_PER_NODE * nodes in the cluster).
-   * @param totalBytes Count of total bytes for job
-   * @param job The job to configure
-   * @return Count of maps to run.
-   */
-  private static void setMapCount(long totalBytes, JobConf job) 
-      throws IOException {
-    int numMaps =
-      (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
-    numMaps = Math.min(numMaps, 
-        job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
-          new JobClient(job).getClusterStatus().getTaskTrackers()));
-    job.setNumMapTasks(Math.max(numMaps, 1));
-  }
-
-  /** Fully delete dir */
-  static void fullyDelete(String dir, Configuration conf) throws IOException {
-    if (dir != null) {
-      Path tmp = new Path(dir);
-      FileUtil.fullyDelete(tmp.getFileSystem(conf), tmp);
-    }
-  }
-
-  //Job configuration
-  private static JobConf createJobConf(Configuration conf) {
-    JobConf jobconf = new JobConf(conf, CopyFiles.class);
-    jobconf.setJobName(NAME);
-
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    jobconf.setMapSpeculativeExecution(false);
-
-    jobconf.setInputFormat(CopyInputFormat.class);
-    jobconf.setOutputKeyClass(Text.class);
-    jobconf.setOutputValueClass(Text.class);
-
-    jobconf.setMapperClass(CopyFilesMapper.class);
-    jobconf.setNumReduceTasks(0);
-    return jobconf;
-  }
-
-  private static final Random RANDOM = new Random();
-  public static String getRandomId() {
-    return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
-  }
-
-  private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
-    boolean update = flags.contains(Options.UPDATE);
-    boolean overwrite = !update && flags.contains(Options.OVERWRITE);
-    jobConf.setBoolean(Options.UPDATE.propertyname, update);
-    jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
-    jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
-        flags.contains(Options.IGNORE_READ_FAILURES));
-    jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
-        flags.contains(Options.PRESERVE_STATUS));
-    return update || overwrite;
-  }
-
-  /**
-   * Initialize DFSCopyFileMapper specific job-configuration.
-   * @param conf : The dfs/mapred configuration.
-   * @param jobConf : The handle to the jobConf object to be initialized.
-   * @param srcPaths : The source URIs.
-   * @param destPath : The destination URI.
-   * @param logPath : Log output directory
-   * @param flags : Command-line flags
-   */
-  private static void setup(Configuration conf, JobConf jobConf,
-                            List<Path> srcPaths, final Path destPath,
-                            Path logPath, EnumSet<Options> flags)
-      throws IOException {
-    jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
-    final boolean updateORoverwrite = setBooleans(jobConf, flags);
-
-    final String randomId = getRandomId();
-    JobClient jClient = new JobClient(jobConf);
-    Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
-    jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
-
-    FileSystem dstfs = destPath.getFileSystem(conf);
-    boolean dstExists = dstfs.exists(destPath);
-    boolean dstIsDir = false;
-    if (dstExists) {
-      dstIsDir = dstfs.getFileStatus(destPath).isDir();
-    }
-
-    // default logPath
-    if (logPath == null) {
-      String filename = "_distcp_logs_" + randomId;
-      if (!dstExists || !dstIsDir) {
-        Path parent = destPath.getParent();
-        if (!dstfs.exists(parent)) {
-          dstfs.mkdirs(parent);
-        }
-        logPath = new Path(parent, filename);
-      } else {
-        logPath = new Path(destPath, filename);
-      }
-    }
-    FileOutputFormat.setOutputPath(jobConf, logPath);
-    
-    // create src list, dst list
-    FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
-
-    Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
-    jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
-    SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
-        srcfilelist, LongWritable.class, FilePair.class,
-        SequenceFile.CompressionType.NONE);
-
-    Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
-    SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
-        dstfilelist, Text.class, Text.class,
-        SequenceFile.CompressionType.NONE);
-
-    Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
-    jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
-    SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
-        dstdirlist, Text.class, FilePair.class,
-        SequenceFile.CompressionType.NONE);
-
-    // handle the case where the destination directory doesn't exist
-    // and we've only a single src directory OR we're updating/overwriting
-    // the contents of the destination directory.
-    final boolean special =
-      (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
-    int srcCount = 0, cnsyncf = 0, dirsyn = 0;
-    long cbsize = 0L, cbsyncs = 0L;
-    try {
-      for (Path src : srcPaths) {
-        FileSystem fs = src.getFileSystem(conf);
-        FileStatus srcfilestat = fs.getFileStatus(src);
-        Path root = special && srcfilestat.isDir()? src: src.getParent();
-        if (srcfilestat.isDir()) {
-          ++srcCount;
-        }
-
-        Stack<FileStatus> pathstack = new Stack<FileStatus>();
-        for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
-          FileStatus cur = pathstack.pop();
-          for(FileStatus child : fs.listStatus(cur.getPath())) {
-            ++srcCount;
-
-            if (child.isDir()) {
-              pathstack.push(child);
-            }
-            else {
-              ++cnsyncf;
-              cbsyncs += child.getLen();
-              cbsize += child.getLen();
-
-              if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
-                src_writer.sync();
-                dst_writer.sync();
-                cnsyncf = 0;
-                cbsyncs = 0L;
-              }
-            }
-
-            String dst = makeRelative(root, child.getPath());
-            src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
-                new FilePair(child, dst));
-            dst_writer.append(new Text(dst),
-                new Text(child.getPath().toString()));
-          }
-
-          if (cur.isDir()) {
-            String dst = makeRelative(root, cur.getPath());
-            dir_writer.append(new Text(dst), new FilePair(cur, dst));
-            if (++dirsyn > SYNC_FILE_MAX) {
-              dirsyn = 0;
-              dir_writer.sync();                
-            }
-          }
-        }
-      }
-    } finally {
-      checkAndClose(src_writer);
-      checkAndClose(dst_writer);
-      checkAndClose(dir_writer);
-    }
-
-    // create dest path dir if copying > 1 file
-    if (!dstfs.exists(destPath)) {
-      if (srcCount > 1 && !dstfs.mkdirs(destPath)) {
-        throw new IOException("Failed to create" + destPath);
-      }
-    }
-
-    checkDuplication(jobfs, dstfilelist,
-        new Path(jobDirectory, "_distcp_sorted"), conf);
-
-    Path tmpDir = new Path(
-        (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
-        destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
-    jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
-    LOG.info("srcCount=" + srcCount);
-    jobConf.setInt(SRC_COUNT_LABEL, srcCount);
-    jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
-    setMapCount(cbsize, jobConf);
-  }
-
-  static private void checkDuplication(FileSystem fs, Path file, Path sorted,
-    Configuration conf) throws IOException {
-    SequenceFile.Reader in = null;
-    try {
-      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
-        new Text.Comparator(), Text.class, Text.class, conf);
-      sorter.sort(file, sorted);
-      in = new SequenceFile.Reader(fs, sorted, conf);
-
-      Text prevdst = null, curdst = new Text();
-      Text prevsrc = null, cursrc = new Text(); 
-      for(; in.next(curdst, cursrc); ) {
-        if (prevdst != null && curdst.equals(prevdst)) {
-          throw new DuplicationException(
-            "Invalid input, there are duplicated files in the sources: "
-            + prevsrc + ", " + cursrc);
-        }
-        prevdst = curdst;
-        curdst = new Text();
-        prevsrc = cursrc;
-        cursrc = new Text();
-      }
-    }
-    finally {
-      checkAndClose(in);
-    }
-  } 
-
-  static boolean checkAndClose(java.io.Closeable io) {
-    if (io != null) {
-      try {
-        io.close();
-      }
-      catch(IOException ioe) {
-        LOG.warn(StringUtils.stringifyException(ioe));
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /** An exception class for duplicated source files. */
-  public static class DuplicationException extends IOException {
-    private static final long serialVersionUID = 1L;
-    /** Error code for this exception */
-    public static final int ERROR_CODE = -2;
-    DuplicationException(String message) {super(message);}
-  }
-}
Index: bin/hadoop
===================================================================
--- bin/hadoop	(revision 664154)
+++ bin/hadoop	(working copy)
@@ -141,6 +141,13 @@
   CLASSPATH=${CLASSPATH}:$f;
 done
 
+for f in $HADOOP_HOME/hadoop-*-tools.jar; do
+  TOOL_PATH=${TOOL_PATH}:$f;
+done
+for f in $HADOOP_HOME/build/hadoop-*-tools.jar; do
+  TOOL_PATH=${TOOL_PATH}:$f;
+done
+
 # add user-specified CLASSPATH last
 if [ "$HADOOP_CLASSPATH" != "" ]; then
   CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
@@ -159,6 +166,7 @@
   CLASSPATH=`cygpath -p -w "$CLASSPATH"`
   HADOOP_HOME=`cygpath -d "$HADOOP_HOME"`
   HADOOP_LOG_DIR=`cygpath -d "$HADOOP_LOG_DIR"`
+  TOOL_PATH=`cygpath -d "$TOOL_PATH"`
 fi
 # setup 'java.library.path' for native-hadoop code if necessary
 JAVA_LIBRARY_PATH=''
@@ -228,13 +236,15 @@
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hadoop.util.RunJar
 elif [ "$COMMAND" = "distcp" ] ; then
-  CLASS=org.apache.hadoop.util.CopyFiles
+  CLASS=org.apache.hadoop.tools.DistCp
+  CLASSPATH=${CLASSPATH}:${TOOL_PATH}
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 elif [ "$COMMAND" = "daemonlog" ] ; then
   CLASS=org.apache.hadoop.log.LogLevel
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 elif [ "$COMMAND" = "archive" ] ; then
-  CLASS=org.apache.hadoop.util.HadoopArchives
+  CLASS=org.apache.hadoop.tools.HadoopArchives
+  CLASSPATH=${CLASSPATH}:${TOOL_PATH}
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 else
   CLASS=$COMMAND
Index: build.xml
===================================================================
--- build.xml	(revision 664154)
+++ build.xml	(working copy)
@@ -46,10 +46,12 @@
   <property name="c++.pipes.src" value="${c++.src}/pipes"/>
   <property name="c++.examples.pipes.src" value="${examples.dir}/pipes"/>
   <property name="libhdfs.src" value="${c++.src}/libhdfs"/>
+  <property name="tools.src" value="${basedir}/src/tools"/>
 
   <property name="build.dir" value="${basedir}/build"/>
   <property name="build.classes" value="${build.dir}/classes"/>
   <property name="build.src" value="${build.dir}/src"/>
+  <property name="build.tools" value="${build.dir}/tools"/>
   <property name="build.webapps" value="${build.dir}/webapps"/>
   <property name="build.examples" value="${build.dir}/examples"/>
   <property name="build.anttasks" value="${build.dir}/ant"/>
@@ -149,6 +151,7 @@
     <pathelement location="${test.src.dir}"/>
     <pathelement location="${build.dir}"/>
     <pathelement location="${build.examples}"/>
+    <pathelement location="${build.tools}"/>
     <fileset dir="${test.lib.dir}">
       <include name="**/*.jar" />
       <exclude name="**/excluded/" />
@@ -187,6 +190,7 @@
   <target name="init">
     <mkdir dir="${build.dir}"/>
     <mkdir dir="${build.classes}"/>
+    <mkdir dir="${build.tools}"/>
     <mkdir dir="${build.src}"/>
     <mkdir dir="${build.webapps}/task/WEB-INF"/>
     <mkdir dir="${build.webapps}/job/WEB-INF"/>
@@ -320,6 +324,29 @@
 
     </target>
 
+  <target name="compile-tools" depends="init">
+    <javac 
+     encoding="${build.encoding}" 
+     srcdir="${tools.src}"
+     includes="org/apache/hadoop/**/*.java"
+     destdir="${build.tools}"
+     debug="${javac.debug}"
+     optimize="${javac.optimize}"
+     target="${javac.version}"
+     source="${javac.version}"
+     deprecation="${javac.deprecation}">
+      <compilerarg line="${javac.args} ${javac.args.warnings}" />
+      <classpath refid="classpath"/>
+    </javac>   
+  	
+    <copy todir="${build.tools}">
+      <fileset 
+        dir="${tools.src}" 
+        includes="**/*.properties"
+      />
+    </copy>
+  </target>
+
   <target name="compile-core-native" depends="compile-core-classes"
           if="compile.native">
   	
@@ -379,7 +406,7 @@
      </subant>  	
   </target>
   
-  <target name="compile" depends="compile-core, compile-contrib, compile-ant-tasks" description="Compile core, contrib">
+  <target name="compile" depends="compile-core, compile-contrib, compile-ant-tasks, compile-tools" description="Compile core, contrib">
   </target>
 
   <target name="compile-examples" 
@@ -440,6 +467,17 @@
     </jar>
   </target>
 
+  <target name="tools-jar" depends="jar, compile-tools" 
+          description="Make the Hadoop tools jar.">
+    <jar jarfile="${build.dir}/${final.name}-tools.jar"
+         basedir="${build.tools}">
+      <manifest>
+        <attribute name="Main-Class" 
+                   value="org/apache/hadoop/examples/ExampleDriver"/>
+      </manifest>
+    </jar>
+  </target>
+
   <!-- ================================================================== -->
   <!-- Make the Hadoop metrics jar. (for use outside Hadoop)              -->
   <!-- ================================================================== -->
@@ -463,7 +501,7 @@
   <!-- ================================================================== -->
   <!-- Compile test code                                                  --> 
   <!-- ================================================================== -->
-  <target name="compile-core-test" depends="compile-examples, generate-test-records">
+  <target name="compile-core-test" depends="compile-examples, compile-tools, generate-test-records">
     <javac 
      encoding="${build.encoding}" 
      srcdir="${test.generated.dir}"
@@ -764,7 +802,7 @@
   <!-- ================================================================== -->
   <!--                                                                    -->
   <!-- ================================================================== -->
-  <target name="package" depends="compile, jar, javadoc, examples, jar-test, ant-tasks, package-libhdfs"
+  <target name="package" depends="compile, jar, javadoc, examples, tools-jar, jar-test, ant-tasks, package-libhdfs"
 	  description="Build distribution">
     <mkdir dir="${dist.dir}"/>
     <mkdir dir="${dist.dir}/lib"/>
