Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml	(revision 439052)
+++ conf/hadoop-default.xml	(working copy)
@@ -239,6 +239,14 @@
 </property>
 
 <property>
+  <name>mapred.cache.size</name>
+  <value>10737418240</value>
+  <description>The limit on the size of cache you want to keep, set by default
+  to 10GB. This will act as a soft limit on the cache directory for out of band data.
+  </description>
+</property>
+            
+<property>
   <name>mapred.system.dir</name>
   <value>${hadoop.tmp.dir}/mapred/system</value>
   <description>The shared directory where MapReduce stores control files.
Index: src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(revision 439052)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(working copy)
@@ -36,6 +36,14 @@
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+          JobConf jconf = new JobConf();
+          // run the wordcount example with caching
+          boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input",
+                                                "/tmp/wc/output", "local", jconf,
+                                                "The quick brown fox\nhas many silly\n"
+                                                    + "red fox sox\n");
+          // assert the number of lines read during caching
+          assertTrue("Failed test archives not matching", ret);
       } finally {
           if (mr != null) { mr.shutdown(); }
       }
Index: src/test/org/apache/hadoop/mapred/test.zip
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream

Property changes on: src/test/org/apache/hadoop/mapred/test.zip
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Index: src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java	(revision 0)
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+
+/**
+ * A JUnit test to test caching with DFS
+ * 
+ * @author Mahadev Konar
+ */
+public class TestMiniMRDFSCaching extends TestCase {
+
+  public void testWithDFS() throws IOException {
+    MiniMRCluster mr = null;
+    MiniDFSCluster dfs = null;
+    String namenode = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(65314, conf, true);
+      fileSys = dfs.getFileSystem();
+      namenode = fileSys.getName();
+      mr = new MiniMRCluster(50050, 50060, 2, namenode, true);
+      JobConf jconf = new JobConf();
+      // run the wordcount example with caching
+      boolean ret = MRCaching.launchMRCache("localhost:50050",
+                                            "/testing/wc/input",
+                                            "/testing/wc/output", namenode,
+                                            jconf,
+                                            "The quick brown fox\nhas many silly\n"
+                                                + "red fox sox\n");
+      assertTrue("Archives not matching", ret);
+    } finally {
+      if (fileSys != null) {
+        fileSys.close();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestMiniMRDFSCaching td = new TestMiniMRDFSCaching();
+    td.testWithDFS();
+  }
+}
Index: src/test/org/apache/hadoop/mapred/test.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream

Property changes on: src/test/org/apache/hadoop/mapred/test.jar
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Index: src/test/org/apache/hadoop/mapred/test.txt
===================================================================
--- src/test/org/apache/hadoop/mapred/test.txt	(revision 0)
+++ src/test/org/apache/hadoop/mapred/test.txt	(revision 0)
@@ -0,0 +1 @@
+This is a test file used for testing caching jars, zip and normal files.
Index: src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java	(revision 439052)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java	(working copy)
@@ -94,12 +94,14 @@
    * @param taskDirs the task ids that should be present
    */
   private static void checkTaskDirectories(MiniMRCluster mr,
+                                           String[] jobIds,
                                            String[] taskDirs) {
     mr.waitUntilIdle();
     int trackers = mr.getNumTaskTrackers();
     List neededDirs = new ArrayList(Arrays.asList(taskDirs));
     boolean[] found = new boolean[taskDirs.length];
     for(int i=0; i < trackers; ++i) {
+      int numNotDel = 0;
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
       File trackerDir = new File(localDir, "taskTracker");
       assertTrue("local dir " + localDir + " does not exist.", 
@@ -112,7 +114,7 @@
         System.out.println("Local " + localDir + ": " + contents[j]);
       }
       for(int j=0; j < trackerContents.length; ++j) {
-        System.out.println("Local " + trackerDir + ": " + trackerContents[j]);
+        System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
       }
       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
         String name = contents[fileIdx];
@@ -122,13 +124,11 @@
                      localDir, idx != -1);
           assertTrue("Matching output directory not found " + name +
                      " in " + trackerDir, 
-                     new File(trackerDir, name).isDirectory());
+                     new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
           found[idx] = true;
+          numNotDel++;
         }  
       }
-      assertTrue("The local directory had " + contents.length + 
-                 " and task tracker directory had " + trackerContents.length +
-                 " items.", contents.length == trackerContents.length + 1);
     }
     for(int i=0; i< found.length; i++) {
       assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
@@ -154,7 +154,7 @@
                                                jobTrackerName, namenode);
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-          checkTaskDirectories(mr, new String[]{});
+          checkTaskDirectories(mr, new String[]{}, new String[]{});
           
           // Run a word count example
           JobConf jobConf = new JobConf();
@@ -167,7 +167,7 @@
                                    3, 1);
           assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
-          checkTaskDirectories(mr, new String[]{"task_0002_m_000001_0"});
+          checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
           
       } finally {
           if (fileSys != null) { fileSys.close(); }
Index: src/test/org/apache/hadoop/mapred/MRCaching.java
===================================================================
--- src/test/org/apache/hadoop/mapred/MRCaching.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/MRCaching.java	(revision 0)
@@ -0,0 +1,190 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.mapred.MapReduceBase;
+import java.io.*;
+
+public class MRCaching {
+  static String testStr = "This is a test file " + "used for testing caching "
+      + "jars, zip and normal files.";
+
+  /**
+   * Using the wordcount example and adding caching to it. The cache
+   * archives/files are set and then are checked in the map if they have been
+   * localized or not.
+   */
+  public static class MapClass extends MapReduceBase implements Mapper {
+    JobConf conf;
+
+    private final static IntWritable one = new IntWritable(1);
+
+    private Text word = new Text();
+
+    public void configure(JobConf jconf) {
+      conf = jconf;
+      try {
+        String[] localArchives = conf.getLocalCacheArchives();
+        String[] localFiles = conf.getLocalCacheFiles();
+        FileSystem fs = FileSystem.get(conf);
+        // read the cached files (unzipped, unjarred and text)
+        // and put it into a single file /tmp/test.txt
+        Path file = new Path("/tmp");
+        fs.mkdirs(file);
+        Path fileOut = new Path(file, "test.txt");
+        fs.delete(file);
+        DataOutputStream out = fs.create(fileOut);
+
+        for (int i = 0; i < localArchives.length; i++) {
+          // read out the files from these archives
+          File f = new File(localArchives[i]);
+          File txt = new File(f, "test.txt");
+          FileInputStream fin = new FileInputStream(txt);
+          DataInputStream din = new DataInputStream(fin);
+          String str = din.readLine();
+          din.close();
+          out.writeBytes(str);
+          out.writeBytes("\n");
+        }
+        for (int i = 0; i < localFiles.length; i++) {
+          // read out the files from these archives
+          File txt = new File(localFiles[i]);
+          FileInputStream fin = new FileInputStream(txt);
+          DataInputStream din = new DataInputStream(fin);
+          String str = din.readLine();
+          out.writeBytes(str);
+          out.writeBytes("\n");
+        }
+        out.close();
+      } catch (IOException ie) {
+        System.out.println(StringUtils.stringifyException(ie));
+      }
+    }
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector output, Reporter reporter) throws IOException {
+      String line = ((Text) value).toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+
+    }
+  }
+
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class ReduceClass extends MapReduceBase implements Reducer {
+
+    public void reduce(WritableComparable key, Iterator values,
+        OutputCollector output, Reporter reporter) throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += ((IntWritable) values.next()).get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+
+  public static boolean launchMRCache(String jobTracker, String indir,
+      String outdir, String fileSys, JobConf conf, String input)
+      throws IOException {
+    final Path inDir = new Path(indir);
+    final Path outDir = new Path(outdir);
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    fs.mkdirs(inDir);
+
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("cachetest");
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(MRCaching.MapClass.class);
+    conf.setCombinerClass(MRCaching.ReduceClass.class);
+    conf.setReducerClass(MRCaching.ReduceClass.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setSpeculativeExecution(false);
+    Path localPath = new Path("build/test/cache");
+    Path txtPath = new Path(localPath, new Path("test.txt"));
+    Path jarPath = new Path(localPath, new Path("test.jar"));
+    Path zipPath = new Path(localPath, new Path("test.zip"));
+    Path cacheTest = new Path("/tmp/cachedir");
+    fs.delete(cacheTest);
+    fs.mkdirs(cacheTest);
+    fs.copyFromLocalFile(txtPath, cacheTest);
+    fs.copyFromLocalFile(jarPath, cacheTest);
+    fs.copyFromLocalFile(zipPath, cacheTest);
+    // setting the cached archives to zip, jar and simple text files
+    conf.setCacheArchives("dfs://" + fileSys + "/tmp/cachedir/test.jar,"
+        + "dfs://" + fileSys + "/tmp/cachedir/test.zip");
+    conf.setCacheFiles("dfs://" + fileSys + "/tmp/cachedir/test.txt");
+    JobClient.runJob(conf);
+    int count = 0;
+    // after the job ran check to see if the the input from the localized cache
+    // match the real string. check if there are 3 instances or not.
+    Path result = new Path("/tmp/test.txt");
+    {
+      BufferedReader file = new BufferedReader(new InputStreamReader(fs
+          .open(result)));
+      String line = file.readLine();
+      while (line != null) {
+        if (!testStr.equals(line))
+          return false;
+        count++;
+        line = file.readLine();
+
+      }
+      file.close();
+    }
+    if (count != 3)
+      return false;
+
+    return true;
+
+  }
+}
Index: src/java/org/apache/hadoop/fs/FileUtil.java
===================================================================
--- src/java/org/apache/hadoop/fs/FileUtil.java	(revision 439052)
+++ src/java/org/apache/hadoop/fs/FileUtil.java	(working copy)
@@ -17,6 +17,9 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -230,5 +233,68 @@
     }
     return dst;
   }
-
+  
+  /**
+   * Takes an input dir and returns the du on that local directory. Very basic
+   * implementation.
+   * 
+   * @param dir
+   *          The input dir to get the disk space of this local dir
+   * @return The total disk space of the input local directory
+   */
+  public static long getDU(File dir) {
+    long size = 0;
+    if (!dir.exists())
+      return 0;
+    if (!dir.isDirectory()) {
+      return dir.length();
+    } else {
+      size = dir.length();
+      File[] allFiles = dir.listFiles();
+      for (int i = 0; i < allFiles.length; i++) {
+        size = size + getDU(allFiles[i]);
+      }
+      return size;
+    }
+  }
+    
+	/**
+   * Given a File input it will unzip the file in a the unzip directory
+   * passed as the second parameter
+   * @param inFile The zip file as input
+   * @param unzipDir The unzip directory where to unzip the zip file.
+   * @throws IOException
+   */
+  public static void unZip(File inFile, File unzipDir) throws IOException {
+    Enumeration entries;
+    ZipFile zipFile = new ZipFile(inFile);
+    ;
+    try {
+      entries = zipFile.entries();
+      while (entries.hasMoreElements()) {
+        ZipEntry entry = (ZipEntry) entries.nextElement();
+        if (!entry.isDirectory()) {
+          InputStream in = zipFile.getInputStream(entry);
+          try {
+            File file = new File(unzipDir, entry.getName());
+            file.getParentFile().mkdirs();
+            OutputStream out = new FileOutputStream(file);
+            try {
+              byte[] buffer = new byte[8192];
+              int i;
+              while ((i = in.read(buffer)) != -1) {
+                out.write(buffer, 0, i);
+              }
+            } finally {
+              out.close();
+            }
+          } finally {
+            in.close();
+          }
+        }
+      }
+    } finally {
+      zipFile.close();
+    }
+  }
 }
Index: src/java/org/apache/hadoop/mapred/DistributedCache.java
===================================================================
--- src/java/org/apache/hadoop/mapred/DistributedCache.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/DistributedCache.java	(revision 0)
@@ -0,0 +1,417 @@
+/* Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**************************************************
+ * The DistributedCache maintains all the caching
+ * information of cached archives and unarchives
+ * all the files as well and returns the path
+ *  
+ * @author Mahadev Konar
+ **************************************************/
+class DistributedCache {
+  //cacheID to cacheStatus mapping
+  private static TreeMap cachedArchives = new TreeMap();
+
+  //jobId to cacheID's mapping
+  private static TreeMap cachedJobs = new TreeMap();
+
+  private static final int CRC_BUFFER_SIZE = 64 * 1024;
+
+  private static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.mapred.DistributedCache");
+
+  /**
+   * Create local cache archives for archives for a given array of cache archives
+   * @param t task for which to localize the archives
+   * @param archives which are to be localized
+   * @param jconf  the JobConf for the job
+   * @param dfs the FileSystem mapred is running on
+   * @param isArchive if the cache is an archive or just a file
+   * @param md5s the md5's of the cache archives .crc files
+   * @return an array of paths where the localized caches exist. 
+   * In case of just file caches, a Path to the file is returned. 
+   * In case of archives, a Path to the directory where the files are
+   * unjarred/unzipped is returned.
+   * @throws IOException
+   */
+
+  static Path[] getLocalCaches(Task t, String[] tarchives, JobConf jconf,
+      FileSystem dfs, boolean isArchive, String[] md5s) throws IOException {
+    String[] archives = justPaths(tarchives);
+    Path cacheDir = jconf.getLocalPath(TaskTracker.getCacheSubdir());
+    synchronized (cachedJobs) {
+      if (!cachedJobs.containsKey(t.getJobId())) {
+        // doing localizing for this job for the first time
+        JobArchives jarchives = new JobArchives();
+        if (isArchive)
+          jarchives.archives = archives;
+        else
+          jarchives.files = archives;
+
+        cachedJobs.put(t.getJobId(), jarchives);
+        LOG.info("Localizing cache for job " + t.getJobId() + " "
+            + t.getTaskId());
+      } else {
+        JobArchives allArchives = (JobArchives) cachedJobs.get(t.getJobId());
+        if (isArchive)
+          allArchives.archives = archives;
+        else
+          allArchives.files = archives;
+
+      }
+      for (int i = 0; i < archives.length; i++) {
+        synchronized (cachedArchives) {
+          if (!cachedArchives.containsKey(archives[i])) {
+            // was never localized
+            CacheStatus lcacheStatus = new CacheStatus();
+            lcacheStatus.currentStatus = false;
+            lcacheStatus.refcount = 1;
+            lcacheStatus.localLoadPath = new Path(
+                                                  cacheDir,
+                                                  new Path(
+                                                           makeRelative(
+                                                                        archives[i],
+                                                                        dfs)));
+            cachedArchives.put(archives[i], lcacheStatus);
+          } else {
+            CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+                .get(archives[i]);
+            synchronized (lcacheStatus) {
+              lcacheStatus.refcount++;
+            }
+          }
+        }
+      }
+
+    }
+
+    Path[] localizedPath = new Path[archives.length];
+    for (int i = 0; i < archives.length; i++) {
+      CacheStatus lcacheStatus = null;
+      synchronized (cachedArchives) {
+        lcacheStatus = (CacheStatus) cachedArchives.get(archives[i]);
+      }
+      synchronized (lcacheStatus) {
+        localizedPath[i] = localizeCache(t, lcacheStatus, archives[i], dfs,
+                                         jconf, isArchive, md5s[i]);
+        if (localizedPath[i] == null)
+          return null;
+      }
+    }
+    // try deleting stuff if you can
+    long size = FileUtil.getDU(new File(cacheDir.toString()));
+    // setting the cache size to a default of 1MB
+    long allowedSize = jconf.getLong("mapred.cache.size", 1048576L);
+    if (allowedSize < size) {
+      // try some cache deletions
+      deleteCache(jconf);
+    }
+    return localizedPath;
+  }
+
+  /**
+   * Gives the list of urls truncating x= from each of the url
+   * @param tarchives The comma seperated string of archives/files
+   * @return an array of the archives/files truncating "x=" from x=archive/filename
+   */
+  static String[] justPaths(String[] tarchives) {
+    String[] archives = new String[tarchives.length];
+    for (int i = 0; i < tarchives.length; i++) {
+      String[] split = tarchives[i].split("=");
+      if (split.length > 1) {
+        archives[i] = split[1];
+      } else
+        archives[i] = split[0];
+    }
+    return archives;
+  }
+
+  private static void deleteCache(JobConf jconf) throws IOException {
+    //try deleting cache Status with refcount of zero
+    synchronized (cachedArchives) {
+      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
+        String cacheId = (String) it.next();
+        CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+        if (lcacheStatus.refcount == 0) {
+          //delete this cache entry
+          jconf.deleteLocalFiles(lcacheStatus.localLoadPath.toString());
+          cachedArchives.remove(cacheId);
+        }
+      }
+    }
+  }
+
+  private static Path cacheFilePath(Path p) {
+    return new Path(p, p.getName());
+  }
+
+  private static Path localizeCache(Task t, CacheStatus cacheStatus,
+      String urlcacheId, FileSystem tdfs, JobConf jconf, boolean isArchive,
+      String md5) throws IOException {
+    boolean b = true;
+    URI srcURI = null;
+    try {
+      srcURI = new URI(urlcacheId);
+    } catch (URISyntaxException ex) {
+      throw new IOException("URL syntax error." + ex.toString());
+    }
+
+    FileSystem dfs;
+    String cacheId;
+    String fileSysName = getFileSysName(srcURI);
+    if (fileSysName == null) {
+      dfs = tdfs;
+      cacheId = urlcacheId;
+    } else {
+      dfs = FileSystem.getNamed(fileSysName, new Configuration());
+      cacheId = srcURI.getPath();
+    }
+
+    b = ifExistsAndFresh(t, cacheStatus, cacheId, dfs, md5);
+
+    if (b) {
+      if (isArchive)
+        return cacheStatus.localLoadPath;
+      else
+        return cacheFilePath(cacheStatus.localLoadPath);
+    } else {
+      //remove the old archive
+      // if the old archive cannot be removed since it is being used by another job
+      // return null
+      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+            + " is in use and cannot be refreshed");
+      byte[] checkSum = createMD5(cacheId, dfs);
+      FileSystem localFs = FileSystem.getNamed("local", jconf);
+      localFs.delete(cacheStatus.localLoadPath);
+      Path parchive = new Path(cacheStatus.localLoadPath,
+                               new Path(cacheStatus.localLoadPath.getName()));
+
+      localFs.mkdirs(cacheStatus.localLoadPath);
+      dfs.copyToLocalFile(new Path(cacheId), parchive);
+      dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive
+          .toString()
+          + "_md5"));
+      if (isArchive) {
+        String tmpArchive = parchive.toString().toLowerCase();
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(new File(parchive.toString()), new File(parchive
+              .getParent().toString()));
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(new File(parchive.toString()), new File(parchive
+              .getParent().toString()));
+
+        }
+        // else will not do anyhting 
+        // and copy the file into the dir as it is
+      }
+      cacheStatus.currentStatus = true;
+      cacheStatus.md5 = checkSum;
+    }
+    if (isArchive)
+      return cacheStatus.localLoadPath;
+    else
+      return cacheFilePath(cacheStatus.localLoadPath);
+  }
+
+  private static boolean ifExistsAndFresh(Task t, CacheStatus lcacheStatus,
+      String cacheId, FileSystem dfs, String confMD5) throws IOException {
+    //compute the md5 of the crc
+    byte[] digest = null;
+    byte[] fsDigest = createMD5(cacheId, dfs);
+    byte[] confDigest = StringUtils.hexStringToByte(confMD5);
+    //check for existence of the cache
+    if (lcacheStatus.currentStatus == false) {
+      return false;
+    } else {
+      digest = lcacheStatus.md5;
+      if (!MessageDigest.isEqual(confDigest, fsDigest)) {
+        throw new IOException("Inconsistencty in data caching, "
+            + "Cache archives have been changed");
+      } else {
+        if (!MessageDigest.isEqual(confDigest, digest)) {
+          //needs refreshing
+          return false;
+        } else {
+          //does not need any refreshing
+          return true;
+        }
+      }
+    }
+  }
+
+  public static String getFileSysName(URI url) {
+    String fsname = url.getScheme();
+    if ("dfs".equals(fsname)) {
+      String host = url.getHost();
+      int port = url.getPort();
+      return (port == (-1)) ? host : (host + ":" + port);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Creates the md5 checksum of a files .crc in file_md5 in the same
+   * directory
+   * @param filename The filename to compute the checksum for
+   * @param fileSystem The filesystem which contains the file
+   */
+  static byte[] createMD5(String urlfilename, FileSystem frameworkFileSystem)
+      throws IOException {
+    byte[] b = new byte[CRC_BUFFER_SIZE];
+    byte[] digest = null;
+    URI srcURI = null;
+    try {
+      srcURI = new URI(urlfilename);
+    } catch (URISyntaxException ex) {
+      throw new IOException("URL syntax error." + ex.toString());
+    }
+
+    FileSystem fileSystem;
+    String filename;
+    String fileSysName = getFileSysName(srcURI);
+    if (fileSysName == null) {
+      fileSystem = frameworkFileSystem;
+      filename = urlfilename;
+    } else {
+      fileSystem = FileSystem.getNamed(fileSysName, new Configuration());
+      filename = srcURI.getPath();
+    }
+    Path filePath = new Path(filename);
+    Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR
+        + filePath.getName() + "_md5");
+    MessageDigest md5 = null;
+    try {
+      md5 = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException na) {
+      // do nothing
+    }
+    if (!fileSystem.exists(md5File)) {
+      FSInputStream fsStream = fileSystem.openRaw(FileSystem
+          .getChecksumFile(filePath));
+      int read = fsStream.read(b);
+      while (read != -1) {
+        md5.update(b, 0, read);
+        read = fsStream.read(b);
+      }
+      fsStream.close();
+      digest = md5.digest();
+
+      FSDataOutputStream out = fileSystem.create(md5File);
+      out.write(digest);
+      out.close();
+    } else {
+      FSInputStream fsStream = fileSystem.openRaw(md5File);
+      digest = new byte[md5.getDigestLength()];
+      // assuming reading 16 bytes once is not a problem
+      // though it should be checked if 16 bytes have been read or not
+      int read = fsStream.read(digest);
+    }
+
+    return digest;
+  }
+
+  /**
+   * Update the distributed cache when a job finishes
+   * @param jobId The jobid of the job that finished
+   * @param jconf The jobconf of the job that finished
+   */
+  static void jobFinished(String jobId) {
+    //is called by the task tracker whenever it sees that 
+    // a job is finished
+    synchronized (cachedJobs) {
+      JobArchives jarchives = (JobArchives) cachedJobs.get(jobId);
+      if (jarchives != null) {
+        String[] cacheIds = jarchives.archives;
+        for (int i = 0; i < cacheIds.length; i++) {
+          synchronized (cachedArchives) {
+            CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+                .get(cacheIds[i]);
+            synchronized (lcacheStatus) {
+              if (lcacheStatus.refcount > 0)
+                lcacheStatus.refcount--;
+            }
+          }
+        }
+        String[] fcacheIds = jarchives.files;
+        for (int i = 0; i < fcacheIds.length; i++) {
+          synchronized (cachedArchives) {
+            CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+                .get(fcacheIds[i]);
+            synchronized (lcacheStatus) {
+              if (lcacheStatus.refcount > 0)
+                lcacheStatus.refcount--;
+            }
+          }
+        }
+        cachedJobs.remove(jobId);
+      }
+    }
+
+  }
+
+  //make the absolute path relative so as to 
+  // create a dir similar to dfsdir in tasktracker
+  //cache dir
+  private static String makeRelative(String dfsDir, FileSystem dfs)
+      throws IOException {
+    URI srcURI = null;
+    try {
+      srcURI = new URI(dfsDir);
+    } catch (URISyntaxException ex) {
+      throw new IOException(ex.toString());
+    }
+    String fsname = srcURI.getScheme();
+    String path;
+    if ("dfs".equals(fsname)) {
+      path = srcURI.getHost() + srcURI.getPath();
+    } else {
+      String[] split = dfs.getName().split(":");
+      path = split[0] + dfsDir;
+    }
+    return path;
+  }
+
+  static class JobArchives {
+    String[] archives;
+
+    String[] files;
+  }
+
+  static class CacheStatus {
+    // false, not loaded yet, true is loaded
+    public boolean currentStatus;
+
+    public Path localLoadPath;
+
+    public int refcount;
+
+    public byte[] md5;
+  }
+}
Index: src/java/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskTracker.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -61,7 +61,6 @@
     Server taskReportServer = null;
     Server mapOutputServer = null;
     InterTrackerProtocol jobClient;
-
     StatusHttpServer server = null;
     
     boolean shuttingDown = false;
@@ -71,6 +70,7 @@
      * Map from taskId -> TaskInProgress.
      */
     TreeMap runningTasks = null;
+    Map runningJobs = null;
     int mapTotal = 0;
     int reduceTotal = 0;
     boolean justStarted = true;
@@ -89,11 +89,11 @@
 
     static Random r = new Random();
     FileSystem fs = null;
-    static final String SUBDIR = "taskTracker";
-
+    private static final String SUBDIR = "taskTracker";
+    private static final String CACHEDIR = "archive";
+    private static final String JOBCACHE = "jobcache";
     private JobConf fConf;
     private MapOutputFile mapOutputFile;
-
     private int maxCurrentTasks;
     private int failures;
     private int finishedCount[] = new int[1];
@@ -145,6 +145,14 @@
       taskCleanupThread.start();
     }
     
+    static String getCacheSubdir() {
+      return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+    }
+
+    static String getJobCacheSubdir() {
+      return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+    }
+    
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
     }
@@ -163,6 +171,7 @@
         // Clear out state tables
         this.tasks = new TreeMap();
         this.runningTasks = new TreeMap();
+        this.runningJobs = new TreeMap();
         this.mapTotal = 0;
         this.reduceTotal = 0;
         this.acceptNewTasks = true;
@@ -203,13 +212,91 @@
         
         this.running = true;
     }
+        
+    // intialize the job directory
+    private void localizeJob(TaskInProgress tip) throws IOException {
+      Path localJarFile = null;
+      Task t = tip.getTask();
+      Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
+          .getJobId()
+          + Path.SEPARATOR + "job.xml"));
+      RunningJob rjob = null;
+      synchronized (runningJobs) {
+        if (!runningJobs.containsKey(t.getJobId())) {
+          rjob = new RunningJob();
+          rjob.localized = false;
+          rjob.tasks = new ArrayList();
+          rjob.jobFile = localJobFile;
+          rjob.tasks.add(tip);
+          runningJobs.put(t.getJobId(), rjob);
+        } else {
+          rjob = (RunningJob) runningJobs.get(t.getJobId());
+          // keep this for later use when we just get a jobid to delete
+          // the data for
+          rjob.tasks.add(tip);
+        }
+      }
+      synchronized (rjob) {
+        if (!rjob.localized) {
+          localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
+              .getJobId())
+              + Path.SEPARATOR + "job.jar");
+  
+          String jobFile = t.getJobFile();
+          fs.copyToLocalFile(new Path(jobFile), localJobFile);
+          JobConf localJobConf = new JobConf(localJobFile);
+          String jarFile = localJobConf.getJar();
+          if (jarFile != null) {
+            fs.copyToLocalFile(new Path(jarFile), localJarFile);
+            localJobConf.setJar(localJarFile.toString());
+            FileSystem localFs = FileSystem.getNamed("local", fConf);
+            OutputStream out = localFs.create(localJobFile);
+            try {
+              localJobConf.write(out);
+            } finally {
+              out.close();
+            }
 
-      public synchronized void shutdown() throws IOException {
+            // also unjar the job.jar files in workdir
+            File workDir = new File(
+                                    new File(localJobFile.toString()).getParent(),
+                                    "work");
+            workDir.mkdirs();
+            RunJar.unJar(new File(localJarFile.toString()), workDir);
+          }
+          rjob.localized = true;
+        }
+      }
+
+      launchTaskForJob(tip, new JobConf(rjob.jobFile));
+
+    }
+    
+    private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
+      synchronized (tip) {
+      try {
+        tip.setJobConf(jobConf);
+        tip.launchTask();
+      } catch (Throwable ie) {
+        tip.runstate = TaskStatus.FAILED;
+        try {
+          tip.cleanup();
+        } catch (Throwable ie2) {
+          // Ignore it, we are just trying to cleanup.
+        }
+        String error = StringUtils.stringifyException(ie);
+        tip.reportDiagnosticInfo(error);
+        LOG.info(error);
+      }
+      }
+    }
+    
+    public synchronized void shutdown() throws IOException {
           shuttingDown = true;
           close();
           if (this.server != null) {
             try {
-                LOG.info("Shttting down StatusHttpServer");
+                LOG.info("Shutting down StatusHttpServer");
                 this.server.stop();
             } catch (InterruptedException ex) {
                 ex.printStackTrace();
@@ -321,6 +408,12 @@
         } catch (InterruptedException ie) {}
       }
     }
+    /**Return the DFS filesystem
+     * @return
+     */
+    public FileSystem getFileSystem(){
+      return fs;
+    }
     
     /**
      * Main service loop.  Will stay in this loop forever.
@@ -457,6 +550,10 @@
               synchronized (this) {
                 for (int i = 0; i < toCloseIds.length; i++) {
                   Object tip = tasks.get(toCloseIds[i]);
+                  synchronized(runningJobs){
+                    runningJobs.remove(((TaskInProgress)
+                	 	  tasks.get(toCloseIds[i])).getTask().getJobId());
+                  }
                   if (tip != null) {
                     tasksToCleanup.put(tip);
                   } else {
@@ -560,8 +657,8 @@
 
       return true;
     }
-
-	/**
+    
+    /**
      * Start a new task.
      * All exceptions are handled locally, so that we don't mess up the
      * task tracker.
@@ -578,20 +675,10 @@
           reduceTotal++;
         }
       }
-      synchronized (tip) {
-        try {
-          tip.launchTask();
-        } catch (Throwable ie) {
-          tip.runstate = TaskStatus.FAILED;
-          try {
-            tip.cleanup();
-          } catch (Throwable ie2) {
-            // Ignore it, we are just trying to cleanup.
-          }
-          String error = StringUtils.stringifyException(ie);
-          tip.reportDiagnosticInfo(error);
-          LOG.info(error);
-        }
+      try{
+    	  localizeJob(tip);
+      }catch(IOException ie){
+    	  LOG.warn("Error initializing Job " + tip.getTask().getJobId());
       }
     }
     
@@ -700,6 +787,7 @@
         private JobConf localJobConf;
         private boolean keepFailedTaskFiles;
         private boolean alwaysKeepTaskFiles;
+        private boolean keepJobFiles;
 
         /**
          */
@@ -711,60 +799,52 @@
             this.lastProgressReport = System.currentTimeMillis();
             this.defaultJobConf = conf;
             localJobConf = null;
+            keepJobFiles = false;
         }
-
-        /**
-         * Some fields in the Task object need to be made machine-specific.
-         * So here, edit the Task's fields appropriately.
-         */
-        private void localizeTask(Task t) throws IOException {
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + 
-                                                 task.getTaskId());
-            Path localJobFile =
-              this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
-            Path localJarFile =
-              this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
-
-            String jobFile = t.getJobFile();
-            fs.copyToLocalFile(new Path(jobFile), localJobFile);
-            t.setJobFile(localJobFile.toString());
-
-            localJobConf = new JobConf(localJobFile);
-            localJobConf.set("mapred.local.dir",
-                    this.defaultJobConf.get("mapred.local.dir"));
-            String jarFile = localJobConf.getJar();
-            if (jarFile != null) {
-              fs.copyToLocalFile(new Path(jarFile), localJarFile);
-              localJobConf.setJar(localJarFile.toString());
-            }
-            task.localizeConfiguration(localJobConf);
-
-            FileSystem localFs = FileSystem.getNamed("local", fConf);
-            OutputStream out = localFs.create(localJobFile);
-            try {
-              localJobConf.write(out);
-            } finally {
-              out.close();
-            }
-            // set the task's configuration to the local job conf
-            // rather than the default.
-            t.setConf(localJobConf);
-            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+        
+        private void localizeTask(Task task) throws IOException{
+            Path localTaskDir =
+              new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR
+                    + JOBCACHE + Path.SEPARATOR
+                    + task.getJobId()), task.getTaskId());
+           FileSystem localFs = FileSystem.getNamed("local", fConf);
+           localFs.mkdirs(localTaskDir);
+           Path localTaskFile = new Path(localTaskDir, "job.xml");
+           task.setJobFile(localTaskFile.toString());
+           localJobConf.set("mapred.local.dir",
+                    fConf.get("mapred.local.dir"));
+            
+           localJobConf.set("mapred.task.id", task.getTaskId());
+           keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+           task.localizeConfiguration(localJobConf);
+           OutputStream out = localFs.create(localTaskFile);
+           try {
+             localJobConf.write(out);
+           } finally {
+             out.close();
+           }
+            task.setConf(localJobConf);
             String keepPattern = localJobConf.getKeepTaskFilesPattern();
             if (keepPattern != null) {
-              alwaysKeepTaskFiles = 
+                keepJobFiles = true;
+                alwaysKeepTaskFiles = 
                 Pattern.matches(keepPattern, task.getTaskId());
             } else {
               alwaysKeepTaskFiles = false;
             }
         }
-
+        
         /**
          */
         public Task getTask() {
             return task;
         }
 
+        public void setJobConf(JobConf lconf){
+            this.localJobConf = lconf;
+            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+        }
+        
         /**
          */
         public synchronized TaskStatus createStatus() {
@@ -885,11 +965,19 @@
          * finished.  If the task is still running, kill it (and clean up
          */
         public synchronized void jobHasFinished() throws IOException {
+        	 
             if (getRunState() == TaskStatus.RUNNING) {
                 killAndCleanup(false);
             } else {
                 cleanup();
             }
+            DistributedCache.jobFinished(task.getJobId());
+            if (keepJobFiles)
+              return;
+            // delete the job diretory for this task 
+            // since the job is done/failed
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                    JOBCACHE + Path.SEPARATOR +  task.getJobId());
         }
 
         /**
@@ -943,10 +1031,13 @@
                  }
                }
             }
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
-        }
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                    JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+                    taskId);
+            }
     }
 
+    
     // ///////////////////////////////////////////////////////////////
     // TaskUmbilicalProtocol
     /////////////////////////////////////////////////////////////////
@@ -1044,6 +1135,16 @@
           LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
         }
     }
+    
+    /**
+     *  The datastructure for initializing a job
+     */
+    static class RunningJob{
+      Path jobFile;
+      // keep this for later use
+      ArrayList tasks;
+      boolean localized;
+    }
 
     /** 
      * The main() for child processes. 
Index: src/java/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobConf.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -170,6 +170,7 @@
     String dirs = get("mapred.input.dir");
     set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
   }
+
   public Path[] getInputPaths() {
     String dirs = get("mapred.input.dir", "");
     ArrayList list = Collections.list(new StringTokenizer(dirs, ","));
@@ -197,7 +198,82 @@
   }
 
   /**
-   * Set whether the framework shoul keep the intermediate files for 
+   * Set the job archives
+   * @param archive the comma seperated archive directories to set the archives to
+   */
+  public void setCacheArchives(String archive) {
+    set("mapred.cache.archives", archive);
+  }
+
+  /**
+   * Set cache files
+   * @param files the files that are comma seperated
+   */
+  public void setCacheFiles(String files) {
+    set("mapred.cache.files", files);
+  }
+
+  /**
+   * Get cache archives
+   * @return The mapred cache archives requested by the job
+   */
+  public String[] getCacheArchives() throws IOException {
+    return getStrings("mapred.cache.archives");
+  }
+
+  /**
+   * Get cache files
+   * @return The mapred cache files requested by the job
+   */
+  public String[] getCacheFiles() throws IOException {
+    return getStrings("mapred.cache.files");
+  }
+
+  /**
+   * Get the local paths for the cached archives
+   * @return string array of local cached archives
+   */
+  public String[] getLocalCacheArchives() throws IOException {
+    return getStrings("mapred.cache.localArchives");
+  }
+
+  /**
+   * Get the local paths for the cached files
+   * @return string array of local cached files
+   */
+  public String[] getLocalCacheFiles() throws IOException {
+    return getStrings("mapred.cache.localFiles");
+  }
+
+  /**
+   * Get the local path for a named cache
+   * @param name the name of the cache you specified in the setcachearchives/files
+   * @return The localized path of the named cache file
+   * @throws IOException
+   */
+  public String getNamedCache(String name) throws IOException {
+    String[] localCacheFiles = getLocalCacheFiles();
+    String[] cacheFiles = getCacheFiles();
+    String[] localCacheArchives = getLocalCacheArchives();
+    String[] cacheArchives = getCacheArchives();
+    for (int i = 0; i < cacheFiles.length; i++) {
+      String[] split = cacheFiles[i].split("=");
+      if (split[0].equals(name)) {
+        return localCacheFiles[i];
+      }
+    }
+    for (int i = 0; i < cacheArchives.length; i++) {
+      String[] split = cacheArchives[i].split("=");
+      if (split[0].equals(name)) {
+        return localCacheArchives[i];
+      }
+    }
+    // no matches for this cache name
+    return null;
+  }
+  
+  /**
+   * Set whether the framework should keep the intermediate files for 
    * failed tasks.
    */
   public void setKeepFailedTaskFiles(boolean keep) {
Index: src/java/org/apache/hadoop/mapred/JobClient.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobClient.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/JobClient.java	(working copy)
@@ -227,7 +227,8 @@
         JobConf job = new JobConf(jobFile);
         return submitJob(job);
     }
-
+    
+   
     /**
      * Submit a job to the MR system
      */
@@ -244,11 +245,39 @@
         Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
         Path submitJobFile = new Path(submitJobDir, "job.xml");
         Path submitJarFile = new Path(submitJobDir, "job.jar");
-
+        FileSystem fs = getFs();
+        // try getting the md5 of the archives
+        String[] tarchives = job.getCacheArchives();
+        String[] tfiles = job.getCacheFiles();
+        if ((tarchives != null) || (tfiles != null)) {
+          // prepare these archives for md5 checksums
+          if (tarchives != null) {
+            String[] archives = DistributedCache.justPaths(tarchives);
+            String md5Archives = StringUtils.byteToHexString(DistributedCache
+                .createMD5(archives[0], fs));
+            for (int i = 1; i < archives.length; i++) {
+              md5Archives = md5Archives
+                  + ","
+                  + StringUtils.byteToHexString(DistributedCache
+                      .createMD5(archives[i], fs));
+            }
+            job.set("mapred.cache.archivemd5", md5Archives);
+          }
+          if (tfiles != null) {
+            String[] files = DistributedCache.justPaths(tfiles);
+            String md5Files = StringUtils.byteToHexString(DistributedCache
+                .createMD5(files[0], fs));
+            for (int i = 1; i < files.length; i++) {
+              md5Files = md5Files
+                  + ","
+                  + StringUtils.byteToHexString(DistributedCache
+                      .createMD5(files[i], fs));
+            }
+            job.set("mapred.cache.filemd5", md5Files);
+          }
+        }
+       
         String originalJarPath = job.getJar();
-
-        FileSystem fs = getFs();
-
         short replication = (short)job.getInt("mapred.submit.replication", 10);
 
         if (originalJarPath != null) {           // copy jar to JobTracker's fs
Index: src/java/org/apache/hadoop/mapred/TaskRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskRunner.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java	(working copy)
@@ -61,25 +61,65 @@
   */
   public void close() throws IOException {}
 
+  private String stringifyPathArray(Path[] p){
+	  if (p == null){
+      return null;
+    }
+    String str = p[0].toString();
+    for (int i = 1; i < p.length; i++){
+      str = str + "," + p[i].toString();
+    }
+    return str;
+  }
+  
   public final void run() {
     try {
-
+      
+      //before preparing the job localize 
+      //all the archives
+      
+      String[] archives = conf.getCacheArchives();
+      String[] files = conf.getCacheFiles();
+      if ((archives != null) || (files != null)) {
+        if (archives != null) {
+          String[] md5 = conf.getStrings("mapred.cache.archivemd5");
+          Path[] p = DistributedCache.getLocalCaches(t, archives, conf, tracker
+              .getFileSystem(), true, md5);
+          conf.set("mapred.cache.localArchives", stringifyPathArray(p));
+        }
+        if ((files != null)) {
+          String[] md5 = conf.getStrings("mapred.cache.filemd5");
+          Path[] p = DistributedCache.getLocalCaches(t, files, conf, tracker
+              .getFileSystem(), false, md5);
+          conf.set("mapred.cache.localFiles", stringifyPathArray(p));
+        }
+        // sets the paths to local archives and paths
+        Path localTaskFile = new Path(t.getJobFile());
+        FileSystem localFs = FileSystem.getNamed("local", conf);
+        localFs.delete(localTaskFile);
+        OutputStream out = localFs.create(localTaskFile);
+        try {
+          conf.write(out);
+        } finally {
+          out.close();
+        }
+      }
+      
       if (! prepare()) {
         return;
       }
 
       String sep = System.getProperty("path.separator");
-      File workDir = new File(new File(t.getJobFile()).getParent(), "work");
-      workDir.mkdirs();
-               
       StringBuffer classPath = new StringBuffer();
       // start with same classpath as parent process
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
-
+      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+      workDir.mkdirs();
+	  
       String jar = conf.getJar();
-      if (jar != null) {                      // if jar exists, it into workDir
-        RunJar.unJar(new File(jar), workDir);
+      if (jar != null) {       
+    	  // if jar exists, it into workDir
         File[] libs = new File(workDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
@@ -164,6 +204,7 @@
     }
   }
 
+  
   /**
    * Handle deprecated mapred.child.heap.size.
    * If present, interpolate into mapred.child.java.opts value with
Index: src/java/org/apache/hadoop/util/StringUtils.java
===================================================================
--- src/java/org/apache/hadoop/util/StringUtils.java	(revision 439052)
+++ src/java/org/apache/hadoop/util/StringUtils.java	(working copy)
@@ -79,4 +79,34 @@
     }
     return numFormat.format(result) + suffix;
   }
-}
+  
+  /**
+   * Given an array of bytes it will convert the bytes to a hex string
+   * representation of the bytes
+   * @param bytes
+   * @return hex string representation of the byte array
+   */
+  public static String byteToHexString(byte bytes[]) {
+    StringBuffer retString = new StringBuffer();
+    for (int i = 0; i < bytes.length; ++i) {
+      retString.append(Integer.toHexString(0x0100 + (bytes[i] & 0x00FF))
+          .substring(1));
+    }
+    return retString.toString();
+  }
+
+  /**
+   * Given a hexstring this will return the byte array corresponding to the
+   * string
+   * @param hex the hex String array
+   * @return a byte array that is a hex string representation of the given
+   *         string. The size of the byte array is therefore hex.length/2
+   */
+  public static byte[] hexStringToByte(String hex) {
+    byte[] bts = new byte[hex.length() / 2];
+    for (int i = 0; i < bts.length; i++) {
+      bts[i] = (byte) Integer.parseInt(hex.substring(2 * i, 2 * i + 2), 16);
+    }
+    return bts;
+  }
+}
\ No newline at end of file
Index: build.xml
===================================================================
--- build.xml	(revision 439052)
+++ build.xml	(working copy)
@@ -35,6 +35,7 @@
   <property name="test.src.dir" value="${basedir}/src/test"/>
   <property name="test.build.dir" value="${build.dir}/test"/>
   <property name="test.build.data" value="${test.build.dir}/data"/>
+  <property name="test.cache.data" value="${test.build.dir}/cache"/>
   <property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
@@ -273,6 +274,11 @@
                       value="org/apache/hadoop/test/AllTestDriver"/>
          </manifest>
     </jar>
+    <delete dir="${test.cache.data}"/>
+    <mkdir dir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>    
   </target>
 
   <!-- ================================================================== -->
@@ -284,7 +290,6 @@
     <mkdir dir="${test.build.data}"/>
     <delete dir="${hadoop.log.dir}"/>
     <mkdir dir="${hadoop.log.dir}"/>
-
     <junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>
