Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml	(revision 439052)
+++ conf/hadoop-default.xml	(working copy)
@@ -239,6 +239,14 @@
 </property>
 
 <property>
+  <name>mapred.cache.size</name>
+  <value>10737418240</value>
+  <description>The limit on the size of cache you want to keep, set by default
+  to 10GB. This will act as a soft limit on the cache directory for out of band data.
+  </description>
+</property>
+            
+<property>
   <name>mapred.system.dir</name>
   <value>${hadoop.tmp.dir}/mapred/system</value>
   <description>The shared directory where MapReduce stores control files.
Index: src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(revision 439052)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(working copy)
@@ -36,6 +36,14 @@
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+           JobConf jconf = new JobConf();
+          // run the wordcount example with caching
+          boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input", "/tmp/wc/output", "local", jconf,
+                                                "The quick brown fox\nhas many silly\n" +
+                                                "red fox sox\n");
+                    //assert the number of lines read during caching
+          assertTrue("Failed test archives not matching", ret);
+                    
       } finally {
           if (mr != null) { mr.shutdown(); }
       }
Index: src/test/org/apache/hadoop/mapred/test.zip
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream

Property changes on: src/test/org/apache/hadoop/mapred/test.zip
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Index: src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java	(revision 0)
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+
+/**
+ * A JUnit test to test caching with DFS 
+ *
+ * @author Mahadev Konar
+ */
+public class TestMiniMRDFSCaching extends TestCase {
+
+  public void testWithDFS() throws IOException {
+      MiniMRCluster mr = null;
+      MiniDFSCluster dfs = null;
+      String namenode = null;
+      FileSystem fileSys = null;
+      try{
+	  Configuration conf = new Configuration();
+	  dfs = new MiniDFSCluster(65314, conf, true);
+	  fileSys = dfs.getFileSystem();
+	  namenode = fileSys.getName();
+	  mr = new MiniMRCluster(50050, 50060, 2, namenode, true);
+          JobConf jconf = new JobConf();
+          // run the wordcount example with caching
+	  boolean ret = MRCaching.launchMRCache("localhost:50050", "/testing/wc/input", "/testing/wc/output", 
+                                  namenode, jconf, 
+				  "The quick brown fox\nhas many silly\n" +
+				  "red fox sox\n"); 
+	  assertTrue("Archives not matching", ret);  
+      } finally{
+	  if (fileSys != null) {fileSys.close();}
+	  if (dfs != null) {dfs.shutdown();}
+	  if (mr != null) {mr.shutdown();}
+      }
+  }
+    
+    public static void main(String[] argv) throws Exception {
+	TestMiniMRDFSCaching td = new TestMiniMRDFSCaching();
+	td.testWithDFS();
+    }
+}
+  
Index: src/test/org/apache/hadoop/mapred/test.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream

Property changes on: src/test/org/apache/hadoop/mapred/test.jar
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Index: src/test/org/apache/hadoop/mapred/test.txt
===================================================================
--- src/test/org/apache/hadoop/mapred/test.txt	(revision 0)
+++ src/test/org/apache/hadoop/mapred/test.txt	(revision 0)
@@ -0,0 +1 @@
+This is a test file used for testing caching jars, zip and normal files.
Index: src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java	(revision 439052)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java	(working copy)
@@ -94,12 +94,14 @@
    * @param taskDirs the task ids that should be present
    */
   private static void checkTaskDirectories(MiniMRCluster mr,
+                                           String[] jobIds,
                                            String[] taskDirs) {
     mr.waitUntilIdle();
     int trackers = mr.getNumTaskTrackers();
     List neededDirs = new ArrayList(Arrays.asList(taskDirs));
     boolean[] found = new boolean[taskDirs.length];
     for(int i=0; i < trackers; ++i) {
+      int numNotDel = 0;
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
       File trackerDir = new File(localDir, "taskTracker");
       assertTrue("local dir " + localDir + " does not exist.", 
@@ -112,7 +114,7 @@
         System.out.println("Local " + localDir + ": " + contents[j]);
       }
       for(int j=0; j < trackerContents.length; ++j) {
-        System.out.println("Local " + trackerDir + ": " + trackerContents[j]);
+        System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
       }
       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
         String name = contents[fileIdx];
@@ -122,13 +124,11 @@
                      localDir, idx != -1);
           assertTrue("Matching output directory not found " + name +
                      " in " + trackerDir, 
-                     new File(trackerDir, name).isDirectory());
+                     new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
           found[idx] = true;
+          numNotDel++;
         }  
       }
-      assertTrue("The local directory had " + contents.length + 
-                 " and task tracker directory had " + trackerContents.length +
-                 " items.", contents.length == trackerContents.length + 1);
     }
     for(int i=0; i< found.length; i++) {
       assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
@@ -154,7 +154,7 @@
                                                jobTrackerName, namenode);
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-          checkTaskDirectories(mr, new String[]{});
+          checkTaskDirectories(mr, new String[]{}, new String[]{});
           
           // Run a word count example
           JobConf jobConf = new JobConf();
@@ -167,7 +167,7 @@
                                    3, 1);
           assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
-          checkTaskDirectories(mr, new String[]{"task_0002_m_000001_0"});
+          checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
           
       } finally {
           if (fileSys != null) { fileSys.close(); }
Index: src/test/org/apache/hadoop/mapred/MRCaching.java
===================================================================
--- src/test/org/apache/hadoop/mapred/MRCaching.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/MRCaching.java	(revision 0)
@@ -0,0 +1,193 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.mapred.MapReduceBase;
+import java.io.*;
+
+public class MRCaching {
+    static String testStr = "This is a test file " +
+	"used for testing caching " +
+	"jars, zip and normal files.";
+  /**
+   * Using the wordcount example and adding caching to it. The cache archives/files 
+   * are set and then are checked in the map if they have been localized or not.
+   */
+  public static class MapClass extends MapReduceBase implements Mapper {
+    JobConf conf;
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    public void configure(JobConf jconf){
+	conf = jconf;
+	try{
+	    String[] localArchives = conf.getLocalCacheArchives();
+	    String[] localFiles = conf.getLocalCacheFiles();
+	    FileSystem fs = FileSystem.get(conf);
+            // read the cached files (unzipped, unjarred and text)
+            // and put it into a single file /tmp/test.txt
+	    Path file = new Path("/tmp");
+	    fs.mkdirs(file);
+	    Path fileOut = new Path(file, "test.txt");
+	    fs.delete(file);
+	    DataOutputStream out = fs.create(fileOut);
+	    
+	    for(int i = 0; i < localArchives.length; i++){
+		//read out the files from these archives
+		File f = new File(localArchives[i]);
+		File txt = new File(f, "test.txt");
+		FileInputStream fin = new FileInputStream(txt);
+		DataInputStream din = new DataInputStream(fin);
+		String str = din.readLine();
+		din.close();
+		out.writeBytes(str);
+		out.writeBytes("\n");
+	    }
+	    for(int i = 0; i < localFiles.length; i++){
+		//read out the files from these archives
+		File txt = new File(localFiles[i]);
+		FileInputStream fin = new FileInputStream(txt);
+		DataInputStream din = new DataInputStream(fin);
+		String str = din.readLine();
+		out.writeBytes(str);
+		out.writeBytes("\n");
+	    }
+	    out.close();
+	}catch(IOException ie){
+	    System.out.println(StringUtils.stringifyException(ie));
+	}
+    }
+
+    public void map(WritableComparable key, Writable value, 
+        OutputCollector output, 
+        Reporter reporter) throws IOException {
+      String line = ((Text)value).toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+
+            
+    }
+  }
+  
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class ReduceClass extends MapReduceBase implements Reducer {
+    
+    public void reduce(WritableComparable key, Iterator values,
+        OutputCollector output, 
+        Reporter reporter) throws IOException {
+      int sum = 0; 
+      while (values.hasNext()) {
+        sum += ((IntWritable) values.next()).get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+  
+
+    public static boolean launchMRCache(String jobTracker,
+				     String indir,
+				     String outdir,
+                                     String fileSys,
+				     JobConf conf,
+				     String input) throws IOException {
+    final Path inDir = new Path(indir);
+    final Path outDir = new Path(outdir);
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    fs.mkdirs(inDir);
+    
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("cachetest");
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(MRCaching.MapClass.class);
+    conf.setCombinerClass(MRCaching.ReduceClass.class);
+    conf.setReducerClass(MRCaching.ReduceClass.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setSpeculativeExecution(false);
+    Path localPath = new Path("build/test/cache");
+    Path txtPath = new Path(localPath, new Path("test.txt"));
+    Path jarPath = new Path(localPath, new Path("test.jar"));
+    Path zipPath = new Path(localPath, new Path("test.zip"));
+    Path cacheTest = new Path("/tmp/cachedir");
+    fs.delete(cacheTest);
+    fs.mkdirs(cacheTest);
+    fs.copyFromLocalFile(txtPath, cacheTest);
+    fs.copyFromLocalFile(jarPath, cacheTest);
+    fs.copyFromLocalFile(zipPath, cacheTest);
+    //setting the cached archives to zip, jar and simple text files
+    conf.setCacheArchives("dfs://" + fileSys + "/tmp/cachedir/test.jar," + "dfs://" + fileSys + "/tmp/cachedir/test.zip");
+    conf.setCacheFiles("dfs://" + fileSys + "/tmp/cachedir/test.txt");
+    JobClient.runJob(conf);
+    int count = 0;
+    // after the job ran check to see if the the input from the localized cache
+    // match the real string. check if there are 3 instances or not.
+    Path result = new Path("/tmp/test.txt");
+    {
+	BufferedReader file =
+	    new BufferedReader(new InputStreamReader(fs.open(result)));
+        String line = file.readLine();
+	while (line != null) {
+	    if (!testStr.equals(line))
+		return false;
+	    count++;
+	    line = file.readLine();
+	     
+        }
+        file.close();
+    }
+    if (count != 3)
+	return false;
+    
+    return true;
+    
+    }
+}
+
Index: src/java/org/apache/hadoop/fs/FileUtil.java
===================================================================
--- src/java/org/apache/hadoop/fs/FileUtil.java	(revision 439052)
+++ src/java/org/apache/hadoop/fs/FileUtil.java	(working copy)
@@ -17,6 +17,9 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -230,5 +233,65 @@
     }
     return dst;
   }
-
+  
+  /**
+	 * Takes an input dir and returns the du on that local directory.
+	 * Very basic implementation.
+	 * @param dir The input dir to get the disk space of this local dir
+	 * @return The total disk space of the input local directory
+	 */
+	public static long getDU(File dir) {
+		long size = 0;
+		if (!dir.exists())
+			return 0;
+		if (!dir.isDirectory()) {
+			return dir.length();
+		} else {
+			size = dir.length();
+			File[] allFiles = dir.listFiles();
+			for (int i = 0; i < allFiles.length; i++) {
+				size = size + getDU(allFiles[i]);
+			}
+			return size;
+		}
+	}
+    
+	/**
+	 * Given a File input it will unzip the file in a the unzip directory
+	 * passed as the second parameter
+	 * @param inFile The zip file as input
+	 * @param unzipDir The unzip directory where to unzip the zip file.
+	 * @throws IOException
+	 */  
+	public static void unZip(File inFile, File unzipDir) throws IOException {
+	    Enumeration entries;
+	    ZipFile zipFile = new ZipFile(inFile);;
+	    try{
+	      entries = zipFile.entries();
+	      while(entries.hasMoreElements()) {
+	        ZipEntry entry = (ZipEntry)entries.nextElement();
+	        if (!entry.isDirectory()) {
+	            InputStream in = zipFile.getInputStream(entry);
+	            try {
+	              File file = new File(unzipDir, entry.getName());
+	              file.getParentFile().mkdirs();
+	              OutputStream out = new FileOutputStream(file);
+	              try {
+	                byte[] buffer = new byte[8192];
+	                int i;
+	                while ((i = in.read(buffer)) != -1) {
+	                  out.write(buffer, 0, i);
+	                }
+	              } finally {
+	                out.close();
+	              }
+	            } finally {
+	              in.close();
+	            }
+	        }
+	      }
+	    }finally{
+	        zipFile.close();
+	    }
+	}
 }
Index: src/java/org/apache/hadoop/mapred/DistributedCache.java
===================================================================
--- src/java/org/apache/hadoop/mapred/DistributedCache.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/DistributedCache.java	(revision 0)
@@ -0,0 +1,413 @@
+/* Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**************************************************
+ * The DistributedCache maintains all the caching
+ * information of cached archives and unarchives
+ * all the files as well and returns the path
+ *  
+ * @author Mahadev Konar
+ **************************************************/
+class DistributedCache {
+	//cacheID to cacheStatus mapping
+	private static TreeMap cachedArchives = new TreeMap();
+
+	//jobId to cacheID's mapping
+	private static TreeMap cachedJobs = new TreeMap();
+
+	private static final int CRC_BUFFER_SIZE = 64 * 1024;
+
+	private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.DistributedCache");
+
+	/**
+	 * Create local cache archives for archives for a given array of cache archives
+	 * @param t task for which to localize the archives
+	 * @param archives which are to be localized
+	 * @param jconf  the JobConf for the job
+	 * @param dfs the FileSystem mapred is running on
+	 * @param isArchive if the cache is an archive or just a file
+	 * @param md5s the md5's of the cache archives .crc files
+	 * @return an array of paths where the localized caches exist. 
+	 * In case of just file caches, a Path to the file is returned. 
+	 * In case of archives, a Path to the directory where the files are
+	 * unjarred/unzipped is returned.
+	 * @throws IOException
+	 */
+
+	static Path[] getLocalCaches(Task t, String[] tarchives,
+			JobConf jconf, FileSystem dfs, boolean isArchive, String[] md5s)
+			throws IOException {
+		String[] archives = justPaths(tarchives);
+		Path cacheDir = jconf.getLocalPath(TaskTracker.getCacheSubdir());
+		synchronized (cachedJobs) {
+			if (!cachedJobs.containsKey(t.getJobId())) {
+				// doing localizing for this job for the first time
+				JobArchives jarchives = new JobArchives();
+				if (isArchive)
+					jarchives.archives = archives;
+				else
+					jarchives.files = archives;
+
+				cachedJobs.put(t.getJobId(), jarchives);
+				LOG.info("Localizing cache for job " + t.getJobId() + " "
+						+ t.getTaskId());
+			} else {
+				JobArchives allArchives = (JobArchives) cachedJobs.get(t
+						.getJobId());
+				if (isArchive)
+					allArchives.archives = archives;
+				else
+					allArchives.files = archives;
+
+			}
+			for (int i = 0; i < archives.length; i++) {
+				synchronized (cachedArchives) {
+					if (!cachedArchives.containsKey(archives[i])) {
+						// was never localized
+						CacheStatus lcacheStatus = new CacheStatus();
+						lcacheStatus.currentStatus = false;
+						lcacheStatus.refcount = 1;
+						lcacheStatus.localLoadPath = new Path(cacheDir,
+								new Path(makeRelative(archives[i], dfs)));
+						cachedArchives.put(archives[i], lcacheStatus);
+					} else {
+						CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+								.get(archives[i]);
+						synchronized (lcacheStatus) {
+							lcacheStatus.refcount++;
+						}
+					}
+				}
+			}
+
+		}
+
+		Path[] localizedPath = new Path[archives.length];
+		for (int i = 0; i < archives.length; i++) {
+			CacheStatus lcacheStatus = null;
+			synchronized (cachedArchives) {
+				lcacheStatus = (CacheStatus) cachedArchives
+						.get(archives[i]);
+			}
+			synchronized (lcacheStatus) {
+				localizedPath[i] = localizeCache(t, lcacheStatus,
+						archives[i], dfs, jconf, isArchive, md5s[i]);
+				if (localizedPath[i] == null)
+					return null;
+			}
+		}
+		// try deleting stuff if you can
+		long size = FileUtil.getDU(new File(cacheDir.toString()));
+		// setting the cache size to a default of 1MB
+		long allowedSize = jconf.getLong("mapred.cache.size", 1048576L);
+		if (allowedSize < size) {
+			// try some cache deletions
+			deleteCache(jconf);
+		}
+		return localizedPath;
+	}
+
+	/**
+	 * Gives the list of urls truncating x= from each of the url
+	 * @param tarchives The comma seperated string of archives/files
+	 * @return an array of the archives/files truncating "x=" from x=archive/filename
+	 */
+	static String[] justPaths(String[] tarchives) {
+		String[] archives = new String[tarchives.length];
+		for (int i = 0; i < tarchives.length; i++) {
+			String[] split = tarchives[i].split("=");
+			if (split.length > 1) {
+				archives[i] = split[1];
+			} else
+				archives[i] = split[0];
+		}
+		return archives;
+	}
+
+	private static void deleteCache(JobConf jconf) throws IOException {
+		//try deleting cache Status with refcount of zero
+		synchronized (cachedArchives) {
+			for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
+				String cacheId = (String) it.next();
+				CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+						.get(cacheId);
+				if (lcacheStatus.refcount == 0) {
+					//delete this cache entry
+					jconf.deleteLocalFiles(lcacheStatus.localLoadPath
+							.toString());
+					cachedArchives.remove(cacheId);
+				}
+			}
+		}
+	}
+
+	private static Path cacheFilePath(Path p) {
+		return new Path(p, p.getName());
+	}
+
+	private static Path localizeCache(Task t, CacheStatus cacheStatus,
+			String urlcacheId, FileSystem tdfs, JobConf jconf,
+			boolean isArchive, String md5) throws IOException {
+		boolean b = true;
+		URI srcURI = null;
+		try {
+			srcURI = new URI(urlcacheId);
+		} catch (URISyntaxException ex) {
+			throw new IOException("URL syntax error." + ex.toString());
+		}
+
+		FileSystem dfs;
+		String cacheId;
+		String fileSysName = getFileSysName(srcURI);
+		if (fileSysName == null) {
+			dfs = tdfs;
+			cacheId = urlcacheId;
+		} else {
+			dfs = FileSystem.getNamed(fileSysName, new Configuration());
+			cacheId = srcURI.getPath();
+		}
+
+		b = ifExistsAndFresh(t, cacheStatus, cacheId, dfs, md5);
+
+		if (b) {
+			if (isArchive)
+				return cacheStatus.localLoadPath;
+			else
+				return cacheFilePath(cacheStatus.localLoadPath);
+		} else {
+			//remove the old archive
+			// if the old archive cannot be removed since it is being used by another job
+			// return null
+			if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+				throw new IOException("Cache "
+						+ cacheStatus.localLoadPath.toString()
+						+ " is in use and cannot be refreshed");
+			byte[] checkSum = createMD5(cacheId, dfs);
+			FileSystem localFs = FileSystem.getNamed("local", jconf);
+			localFs.delete(cacheStatus.localLoadPath);
+			Path parchive = new Path(cacheStatus.localLoadPath, new Path(
+					cacheStatus.localLoadPath.getName()));
+
+			localFs.mkdirs(cacheStatus.localLoadPath);
+			dfs.copyToLocalFile(new Path(cacheId), parchive);
+			dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive
+					.toString()
+					+ "_md5"));
+			if (isArchive) {
+				String tmpArchive = parchive.toString().toLowerCase();
+				if (tmpArchive.endsWith(".jar")) {
+					RunJar.unJar(new File(parchive.toString()), new File(
+							parchive.getParent().toString()));
+				} else if (tmpArchive.endsWith(".zip")) {
+					FileUtil.unZip(new File(parchive.toString()), new File(
+							parchive.getParent().toString()));
+
+				}
+				// else will not do anyhting 
+				// and copy the file into the dir as it is
+			}
+			cacheStatus.currentStatus = true;
+			cacheStatus.md5 = checkSum;
+		}
+		if (isArchive)
+			return cacheStatus.localLoadPath;
+		else
+			return cacheFilePath(cacheStatus.localLoadPath);
+	}
+
+	private static boolean ifExistsAndFresh(Task t, CacheStatus lcacheStatus,
+			String cacheId, FileSystem dfs, String confMD5) throws IOException {
+		//compute the md5 of the crc
+		byte[] digest = null;
+		byte[] fsDigest = createMD5(cacheId, dfs);
+		byte[] confDigest = StringUtils.hexStringToByte(confMD5);
+		//check for existence of the cache
+		if (lcacheStatus.currentStatus == false) {
+			return false;
+		} else {
+			digest = lcacheStatus.md5;
+			if (!MessageDigest.isEqual(confDigest, fsDigest)) {
+				throw new IOException("Inconsistencty in data caching, "
+						+ "Cache archives have been changed");
+			} else {
+				if (!MessageDigest.isEqual(confDigest, digest)) {
+					//needs refreshing
+					return false;
+				} else {
+					//does not need any refreshing
+					return true;
+				}
+			}
+		}
+	}
+   
+	public static String getFileSysName(URI url) {
+		String fsname = url.getScheme();
+		if ("dfs".equals(fsname)) {
+			String host = url.getHost();
+			int port = url.getPort();
+			return (port == (-1)) ? host : (host + ":" + port);
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Creates the md5 checksum of a files .crc in file_md5 in the same
+	 * directory
+	 * @param filename The filename to compute the checksum for
+	 * @param fileSystem The filesystem which contains the file
+	 */
+	static byte[] createMD5(String urlfilename,
+			FileSystem frameworkFileSystem) throws IOException {
+		byte[] b = new byte[CRC_BUFFER_SIZE];
+		byte[] digest = null;
+		URI srcURI = null;
+		try {
+			srcURI = new URI(urlfilename);
+		} catch (URISyntaxException ex) {
+			throw new IOException("URL syntax error." +  ex.toString());
+		}
+
+		FileSystem fileSystem;
+		String filename;
+		String fileSysName = getFileSysName(srcURI);
+		if (fileSysName == null) {
+			fileSystem = frameworkFileSystem;
+			filename = urlfilename;
+		} else {
+			fileSystem = FileSystem.getNamed(fileSysName, new Configuration());
+			filename = srcURI.getPath();
+		}
+		Path filePath = new Path(filename);
+		Path md5File = new Path(filePath.getParent().toString()
+				+ Path.SEPARATOR + filePath.getName() + "_md5");
+		MessageDigest md5 = null;
+		try {
+			md5 = MessageDigest.getInstance("MD5");
+		} catch (NoSuchAlgorithmException na) {
+			// do nothing
+		}
+		if (!fileSystem.exists(md5File)) {
+			FSInputStream fsStream = fileSystem.openRaw(FileSystem
+					.getChecksumFile(filePath));
+			int read = fsStream.read(b);
+			while (read != -1) {
+				md5.update(b, 0, read);
+				read = fsStream.read(b);
+			}
+			fsStream.close();
+			digest = md5.digest();
+
+			FSDataOutputStream out = fileSystem.create(md5File);
+			out.write(digest);
+			out.close();
+		} else {
+			FSInputStream fsStream = fileSystem.openRaw(md5File);
+			digest = new byte[md5.getDigestLength()];
+			// assuming reading 16 bytes once is not a problem
+			// though it should be checked if 16 bytes have been read or not
+			int read = fsStream.read(digest);
+		}
+
+		return digest;
+	}
+
+	/**
+	 * Update the distributed cache when a job finishes
+	 * @param jobId The jobid of the job that finished
+	 * @param jconf The jobconf of the job that finished
+	 */
+	static void jobFinished(String jobId) {
+		//is called by the task tracker whenever it sees that 
+		// a job is finished
+		synchronized (cachedJobs) {
+			JobArchives jarchives = (JobArchives) cachedJobs.get(jobId);
+			if (jarchives != null) {
+				String[] cacheIds = jarchives.archives;
+				for (int i = 0; i < cacheIds.length; i++) {
+					synchronized (cachedArchives) {
+						CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+								.get(cacheIds[i]);
+						synchronized (lcacheStatus) {
+							if (lcacheStatus.refcount > 0)
+								lcacheStatus.refcount--;
+						}
+					}
+				}
+				String[] fcacheIds = jarchives.files;
+				for (int i = 0; i < fcacheIds.length; i++) {
+					synchronized (cachedArchives) {
+						CacheStatus lcacheStatus = (CacheStatus) cachedArchives
+								.get(fcacheIds[i]);
+						synchronized (lcacheStatus) {
+							if (lcacheStatus.refcount > 0)
+								lcacheStatus.refcount--;
+						}
+					}
+				}
+				cachedJobs.remove(jobId);
+			}
+		}
+
+	}
+
+	//make the absolute path relative so as to 
+	// create a dir similar to dfsdir in tasktracker
+	//cache dir
+	private static String makeRelative(String dfsDir, FileSystem dfs) throws IOException{
+		URI srcURI = null;
+		try {
+			srcURI = new URI(dfsDir);
+		} catch (URISyntaxException ex) {
+			throw new IOException(ex.toString());
+		}
+		String fsname = srcURI.getScheme();
+		String path;
+		if ("dfs".equals(fsname)) {
+			path = srcURI.getHost() + srcURI.getPath();
+		}
+		else{
+			String[] split = dfs.getName().split(":");
+			path = split[0] + dfsDir;
+		}
+		return path;
+	}
+	
+    static class JobArchives{
+    	String[] archives;
+    	String[] files;
+    }
+	static class CacheStatus {
+		// false, not loaded yet, true is loaded
+		public boolean currentStatus;
+		public Path localLoadPath;
+		public int refcount;
+		public byte[] md5;
+	}
+}
Index: src/java/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskTracker.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -61,7 +61,6 @@
     Server taskReportServer = null;
     Server mapOutputServer = null;
     InterTrackerProtocol jobClient;
-
     StatusHttpServer server = null;
     
     boolean shuttingDown = false;
@@ -71,6 +70,7 @@
      * Map from taskId -> TaskInProgress.
      */
     TreeMap runningTasks = null;
+    Map runningJobs = null;
     int mapTotal = 0;
     int reduceTotal = 0;
     boolean justStarted = true;
@@ -89,11 +89,11 @@
 
     static Random r = new Random();
     FileSystem fs = null;
-    static final String SUBDIR = "taskTracker";
-
+    private static final String SUBDIR = "taskTracker";
+    private static final String CACHEDIR = "archive";
+    private static final String JOBCACHE = "jobcache";
     private JobConf fConf;
     private MapOutputFile mapOutputFile;
-
     private int maxCurrentTasks;
     private int failures;
     private int finishedCount[] = new int[1];
@@ -145,6 +145,14 @@
       taskCleanupThread.start();
     }
     
+    static String getCacheSubdir(){
+    	return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+    }
+    
+    static String getJobCacheSubdir(){
+    	return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+    }
+    
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
     }
@@ -163,6 +171,7 @@
         // Clear out state tables
         this.tasks = new TreeMap();
         this.runningTasks = new TreeMap();
+        this.runningJobs = new TreeMap();
         this.mapTotal = 0;
         this.reduceTotal = 0;
         this.acceptNewTasks = true;
@@ -203,13 +212,90 @@
         
         this.running = true;
     }
-
-      public synchronized void shutdown() throws IOException {
+        
+    // intialize the job directory
+    private void localizeJob(TaskInProgress tip) throws IOException {
+        Path localJarFile = null;
+        Task t = tip.getTask();
+        Path localJobFile =
+            new Path(fConf.getLocalPath(getJobCacheSubdir())
+            , (t.getJobId() + Path.SEPARATOR + "job.xml"));
+        RunningJob rjob = null;
+        synchronized(runningJobs){
+            if (!runningJobs.containsKey(t.getJobId())){
+            	rjob = new RunningJob();
+                rjob.localized = false;
+                rjob.tasks = new ArrayList();
+                rjob.jobFile = localJobFile;
+                rjob.tasks.add(tip);
+                 runningJobs.put(t.getJobId(), rjob);
+          }
+            else{
+            	rjob = (RunningJob) runningJobs.get(t.getJobId());
+            	// keep this for later use when we just get a jobid to delete
+				// the data for
+            	rjob.tasks.add(tip);
+            }
+        }
+        synchronized(rjob){
+        	if (!rjob.localized){
+        		localJarFile =
+        			new Path(fConf.getLocalPath(getJobCacheSubdir())
+                    		, (t.getJobId()) + Path.SEPARATOR + "job.jar");
+            
+        		String jobFile = t.getJobFile();
+        		fs.copyToLocalFile(new Path(jobFile), localJobFile);
+        		JobConf localJobConf = new JobConf(localJobFile);
+        		String jarFile = localJobConf.getJar();
+        		if (jarFile != null) {
+        			fs.copyToLocalFile(new Path(jarFile), localJarFile);
+        			localJobConf.setJar(localJarFile.toString());
+        			FileSystem localFs = FileSystem.getNamed("local", fConf);
+        			OutputStream out = localFs.create(localJobFile);
+        			try {
+        				localJobConf.write(out);
+        			} finally {
+        				out.close();
+        			}
+        		
+        		// also unjar the job.jar files in workdir
+        			File workDir = new File(new File(localJobFile.toString()).getParent(), "work");
+        			workDir.mkdirs();
+        			RunJar.unJar(new File(localJarFile.toString()), workDir);
+        		}
+        		rjob.localized = true;
+        	}
+        }
+        
+        launchTaskForJob(tip, new JobConf(rjob.jobFile));
+      		
+    }
+    
+    private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
+    	synchronized (tip) {
+    		try {
+    			tip.setJobConf(jobConf);
+    	        tip.launchTask();
+    		} catch (Throwable ie) {
+    			tip.runstate = TaskStatus.FAILED;
+    			try {
+    				tip.cleanup();
+    			} catch (Throwable ie2) {
+    				// Ignore it, we are just trying to cleanup.
+    			}
+    			String error = StringUtils.stringifyException(ie);
+    			tip.reportDiagnosticInfo(error);
+    			LOG.info(error);
+    		}
+    	}
+    }
+    
+    public synchronized void shutdown() throws IOException {
           shuttingDown = true;
           close();
           if (this.server != null) {
             try {
-                LOG.info("Shttting down StatusHttpServer");
+                LOG.info("Shutting down StatusHttpServer");
                 this.server.stop();
             } catch (InterruptedException ex) {
                 ex.printStackTrace();
@@ -321,6 +407,12 @@
         } catch (InterruptedException ie) {}
       }
     }
+    /**Return the DFS filesystem
+     * @return
+     */
+    public FileSystem getFileSystem(){
+    	return fs;
+    }
     
     /**
      * Main service loop.  Will stay in this loop forever.
@@ -457,6 +549,10 @@
               synchronized (this) {
                 for (int i = 0; i < toCloseIds.length; i++) {
                   Object tip = tasks.get(toCloseIds[i]);
+                  synchronized(runningJobs){
+                	  runningJobs.remove(((TaskInProgress)
+                			  tasks.get(toCloseIds[i])).getTask().getJobId());
+                  }
                   if (tip != null) {
                     tasksToCleanup.put(tip);
                   } else {
@@ -560,8 +656,8 @@
 
       return true;
     }
-
-	/**
+    
+    /**
      * Start a new task.
      * All exceptions are handled locally, so that we don't mess up the
      * task tracker.
@@ -578,20 +674,10 @@
           reduceTotal++;
         }
       }
-      synchronized (tip) {
-        try {
-          tip.launchTask();
-        } catch (Throwable ie) {
-          tip.runstate = TaskStatus.FAILED;
-          try {
-            tip.cleanup();
-          } catch (Throwable ie2) {
-            // Ignore it, we are just trying to cleanup.
-          }
-          String error = StringUtils.stringifyException(ie);
-          tip.reportDiagnosticInfo(error);
-          LOG.info(error);
-        }
+      try{
+    	  localizeJob(tip);
+      }catch(IOException ie){
+    	  LOG.warn("Error initializing Job " + tip.getTask().getJobId());
       }
     }
     
@@ -700,6 +786,7 @@
         private JobConf localJobConf;
         private boolean keepFailedTaskFiles;
         private boolean alwaysKeepTaskFiles;
+        private boolean keepJobFiles;
 
         /**
          */
@@ -711,60 +798,52 @@
             this.lastProgressReport = System.currentTimeMillis();
             this.defaultJobConf = conf;
             localJobConf = null;
+            keepJobFiles = false;
         }
-
-        /**
-         * Some fields in the Task object need to be made machine-specific.
-         * So here, edit the Task's fields appropriately.
-         */
-        private void localizeTask(Task t) throws IOException {
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + 
-                                                 task.getTaskId());
-            Path localJobFile =
-              this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
-            Path localJarFile =
-              this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
-
-            String jobFile = t.getJobFile();
-            fs.copyToLocalFile(new Path(jobFile), localJobFile);
-            t.setJobFile(localJobFile.toString());
-
-            localJobConf = new JobConf(localJobFile);
-            localJobConf.set("mapred.local.dir",
-                    this.defaultJobConf.get("mapred.local.dir"));
-            String jarFile = localJobConf.getJar();
-            if (jarFile != null) {
-              fs.copyToLocalFile(new Path(jarFile), localJarFile);
-              localJobConf.setJar(localJarFile.toString());
-            }
-            task.localizeConfiguration(localJobConf);
-
-            FileSystem localFs = FileSystem.getNamed("local", fConf);
-            OutputStream out = localFs.create(localJobFile);
-            try {
-              localJobConf.write(out);
-            } finally {
-              out.close();
-            }
-            // set the task's configuration to the local job conf
-            // rather than the default.
-            t.setConf(localJobConf);
-            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+        
+        private void localizeTask(Task task) throws IOException{
+            Path localTaskDir =
+              new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR
+                    + JOBCACHE + Path.SEPARATOR
+                    + task.getJobId()), task.getTaskId());
+           FileSystem localFs = FileSystem.getNamed("local", fConf);
+           localFs.mkdirs(localTaskDir);
+           Path localTaskFile = new Path(localTaskDir, "job.xml");
+           task.setJobFile(localTaskFile.toString());
+           localJobConf.set("mapred.local.dir",
+                    fConf.get("mapred.local.dir"));
+            
+           localJobConf.set("mapred.task.id", task.getTaskId());
+           keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+           task.localizeConfiguration(localJobConf);
+           OutputStream out = localFs.create(localTaskFile);
+           try {
+             localJobConf.write(out);
+           } finally {
+             out.close();
+           }
+            task.setConf(localJobConf);
             String keepPattern = localJobConf.getKeepTaskFilesPattern();
             if (keepPattern != null) {
-              alwaysKeepTaskFiles = 
+                keepJobFiles = true;
+                alwaysKeepTaskFiles = 
                 Pattern.matches(keepPattern, task.getTaskId());
             } else {
               alwaysKeepTaskFiles = false;
             }
         }
-
+        
         /**
          */
         public Task getTask() {
             return task;
         }
 
+        public void setJobConf(JobConf lconf){
+            this.localJobConf = lconf;
+            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+        }
+        
         /**
          */
         public synchronized TaskStatus createStatus() {
@@ -885,11 +964,19 @@
          * finished.  If the task is still running, kill it (and clean up
          */
         public synchronized void jobHasFinished() throws IOException {
+        	 
             if (getRunState() == TaskStatus.RUNNING) {
                 killAndCleanup(false);
             } else {
                 cleanup();
             }
+            DistributedCache.jobFinished(task.getJobId());
+            if (keepJobFiles)
+            	return;
+            // delete the job diretory for this task 
+            // since the job is done/failed
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                    JOBCACHE + Path.SEPARATOR +  task.getJobId());
         }
 
         /**
@@ -943,10 +1030,13 @@
                  }
                }
             }
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
-        }
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                    JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+                    taskId);
+            }
     }
 
+    
     // ///////////////////////////////////////////////////////////////
     // TaskUmbilicalProtocol
     /////////////////////////////////////////////////////////////////
@@ -1044,6 +1134,16 @@
           LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
         }
     }
+    
+    /**
+     *  The datastructure for initializing a job
+     */
+    static class RunningJob{
+    	Path jobFile;
+    	// keep this for later use
+    	ArrayList tasks;
+    	boolean localized;
+    }
 
     /** 
      * The main() for child processes. 
Index: src/java/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobConf.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -170,6 +170,7 @@
     String dirs = get("mapred.input.dir");
     set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
   }
+
   public Path[] getInputPaths() {
     String dirs = get("mapred.input.dir", "");
     ArrayList list = Collections.list(new StringTokenizer(dirs, ","));
@@ -197,7 +198,84 @@
   }
 
   /**
-   * Set whether the framework shoul keep the intermediate files for 
+   * Set the job archives
+   * @param archive the comma seperated archive directories
+   *  to set the archives to
+   */
+  public void setCacheArchives(String archive) {
+	  set("mapred.cache.archives", archive);
+  }
+  
+  /**
+   * Set cache files
+   * @param archive the files that are comma seperated
+   */
+  public void setCacheFiles(String files){
+	  set("mapred.cache.files", files);
+  }
+  
+  /** 
+   * Get cache archives
+   * @return The mapred cache archives requested by the job
+   */
+  public String[] getCacheArchives() throws IOException {
+	  return getStrings("mapred.cache.archives");
+  }
+
+  /**
+   * Get cache files
+   * @return The mapred cache files requested by the job
+   */
+  public String[] getCacheFiles() throws IOException{
+	  return getStrings("mapred.cache.files");
+  }
+  
+  
+  /**
+   * Get the local paths for the cached archives
+   * @return string array of local cached archives
+   */
+  public String[] getLocalCacheArchives() throws IOException{
+	  return getStrings("mapred.cache.localArchives");
+  }
+  
+  /**
+   * Get the local paths for the cached files
+   * @return string array of local cached files
+   */
+  public String[] getLocalCacheFiles() throws IOException{
+	  return getStrings("mapred.cache.localFiles");
+  }
+  
+  /**
+   * Get the local path for a named cache
+   * @param name The name of the cache you specified in the setcachearchives/files
+   * @return The localized path of the named cache file
+   * @throws IOException
+   */
+  public String getNamedCache(String name) throws IOException{
+	  String[] localCacheFiles = getLocalCacheFiles();
+	  String[] cacheFiles = getCacheFiles();
+	  String[] localCacheArchives = getLocalCacheArchives();
+	  String[] cacheArchives = getCacheArchives();
+	  for (int i = 0; i < cacheFiles.length; i++){
+		  String[] split = cacheFiles[i].split("=");
+		  if (split[0].equals(name)){
+			  return localCacheFiles[i];
+		  }
+	  }
+	  for (int i = 0; i < cacheArchives.length; i++){
+		  String[] split = cacheArchives[i].split("=");
+		  if (split[0].equals(name)){
+			  return localCacheArchives[i];
+		  }
+	  }
+	  // no matches for this cache name
+	  return null;
+  }
+  
+  /**
+   * Set whether the framework should keep the intermediate files for 
    * failed tasks.
    */
   public void setKeepFailedTaskFiles(boolean keep) {
Index: src/java/org/apache/hadoop/mapred/JobClient.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobClient.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/JobClient.java	(working copy)
@@ -227,7 +227,8 @@
         JobConf job = new JobConf(jobFile);
         return submitJob(job);
     }
-
+    
+   
     /**
      * Submit a job to the MR system
      */
@@ -244,11 +245,36 @@
         Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
         Path submitJobFile = new Path(submitJobDir, "job.xml");
         Path submitJarFile = new Path(submitJobDir, "job.jar");
-
+        FileSystem fs = getFs();
+        //try getting the md5 of the archives
+        String[] tarchives = job.getCacheArchives();
+        String[] tfiles = job.getCacheFiles();
+        if ((tarchives != null) || (tfiles != null)){
+        	//prepare these archives for md5 checksums
+        	if (tarchives != null){
+        		 String[] archives = DistributedCache.justPaths(tarchives);
+        		 String md5Archives =  StringUtils.byteToHexString(
+    					DistributedCache.createMD5(archives[0], fs));
+        		for(int i = 1; i < archives.length;i++){
+        			md5Archives = md5Archives + "," +  StringUtils.byteToHexString(
+        					DistributedCache.createMD5(archives[i], fs));
+        		}
+        		job.set("mapred.cache.archivemd5", md5Archives);
+        	}
+        	if(tfiles != null) {
+        	    String[] files = DistributedCache.justPaths(tfiles);
+         		String md5Files = StringUtils.byteToHexString(
+        				DistributedCache.createMD5(files[0],fs));
+        		for(int i = 1; i < files.length; i++){
+        			md5Files = md5Files + "," + StringUtils.byteToHexString(
+        					DistributedCache.createMD5(files[i], fs));
+        		}
+        		job.set("mapred.cache.filemd5", md5Files);
+        	}
+        	
+        }
+       
         String originalJarPath = job.getJar();
-
-        FileSystem fs = getFs();
-
         short replication = (short)job.getInt("mapred.submit.replication", 10);
 
         if (originalJarPath != null) {           // copy jar to JobTracker's fs
Index: src/java/org/apache/hadoop/mapred/TaskRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskRunner.java	(revision 439052)
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java	(working copy)
@@ -61,25 +61,65 @@
   */
   public void close() throws IOException {}
 
+  private String stringifyPathArray(Path[] p){
+	  if (p == null){
+		  return null;
+	  }
+	 String str = p[0].toString();
+	 for (int i = 1; i < p.length; i++){
+		 str = str + "," + p[i].toString();
+	 }
+	 return str;
+  }
+  
   public final void run() {
     try {
-
+      
+      //before preparing the job localize 
+      //all the archives
+      
+      String[] archives = conf.getCacheArchives();
+      String[] files = conf.getCacheFiles();
+      if ((archives != null) || (files != null)){
+    	  if (archives != null){
+    		  String[] md5 = conf.getStrings("mapred.cache.archivemd5");
+    		  Path[] p = DistributedCache.getLocalCaches(t, archives, 
+    				  conf, tracker.getFileSystem(), true, md5);
+    		  conf.set("mapred.cache.localArchives", stringifyPathArray(p));
+    	  }
+    	  if ((files != null)){
+    		  String[] md5 = conf.getStrings("mapred.cache.filemd5");
+    		  Path[] p = DistributedCache.getLocalCaches(t, files,
+    				  conf, tracker.getFileSystem(), false, md5);
+    		  conf.set("mapred.cache.localFiles", stringifyPathArray(p));
+    	  }
+    	  //sets the paths to local archives and paths
+    	  Path localTaskFile = new Path(t.getJobFile());
+    	  FileSystem localFs = FileSystem.getNamed("local", conf);
+    	  localFs.delete(localTaskFile);
+    	  OutputStream out = localFs.create(localTaskFile);
+          try {
+            conf.write(out);
+          } finally {
+            out.close();
+          }
+      }
+      
       if (! prepare()) {
         return;
       }
 
       String sep = System.getProperty("path.separator");
-      File workDir = new File(new File(t.getJobFile()).getParent(), "work");
-      workDir.mkdirs();
-               
       StringBuffer classPath = new StringBuffer();
       // start with same classpath as parent process
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
-
+      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+	  workDir.mkdirs();
+	  
       String jar = conf.getJar();
-      if (jar != null) {                      // if jar exists, it into workDir
-        RunJar.unJar(new File(jar), workDir);
+      if (jar != null) {       
+    	  // if jar exists, it into workDir
         File[] libs = new File(workDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
@@ -164,6 +204,7 @@
     }
   }
 
+  
   /**
    * Handle deprecated mapred.child.heap.size.
    * If present, interpolate into mapred.child.java.opts value with
Index: src/java/org/apache/hadoop/util/StringUtils.java
===================================================================
--- src/java/org/apache/hadoop/util/StringUtils.java	(revision 439052)
+++ src/java/org/apache/hadoop/util/StringUtils.java	(working copy)
@@ -79,4 +79,38 @@
     }
     return numFormat.format(result) + suffix;
   }
-}
+  
+  /**
+   * Given an array of bytes it will convert the bytes
+   * to a hex string representation of the bytes
+   * @param bytes
+   * @return hex string representation of the byte array
+   */
+  public static String byteToHexString(byte bytes[])
+  {
+	  StringBuffer retString = new StringBuffer();
+	  for (int i = 0; i < bytes.length; ++i)
+	  {
+		  retString.append(
+				  Integer.toHexString(0x0100 
+						  + (bytes[i] & 0x00FF)).substring(1));
+	  }
+	  return retString.toString();
+  }
+  /**
+   * Given a hexstring this will return the byte array
+   * corresponding to the string
+   * @param hex the hex String array
+   * @return a byte array that is a hex string representation
+   * of the given string. The size of the byte array is therefore
+   * hex.length/2
+   */
+  public static byte[] hexStringToByte(String hex){  
+	  byte[] bts = new byte[hex.length() / 2];
+	  for (int i = 0; i < bts.length; i++) {
+		  bts[i] = (byte) Integer.parseInt(
+				  hex.substring(2*i, 2*i+2), 16);
+	  }
+	  return bts;
+  }
+}
\ No newline at end of file
Index: build.xml
===================================================================
--- build.xml	(revision 439052)
+++ build.xml	(working copy)
@@ -35,6 +35,7 @@
   <property name="test.src.dir" value="${basedir}/src/test"/>
   <property name="test.build.dir" value="${build.dir}/test"/>
   <property name="test.build.data" value="${test.build.dir}/data"/>
+  <property name="test.cache.data" value="${test.build.dir}/cache"/>
   <property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
@@ -273,6 +274,11 @@
                       value="org/apache/hadoop/test/AllTestDriver"/>
          </manifest>
     </jar>
+    <delete dir="${test.cache.data}"/>
+    <mkdir dir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>    
   </target>
 
   <!-- ================================================================== -->
@@ -284,7 +290,6 @@
     <mkdir dir="${test.build.data}"/>
     <delete dir="${hadoop.log.dir}"/>
     <mkdir dir="${hadoop.log.dir}"/>
-
     <junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>
