|
Why not
Wouldnt that be simpler than having a whole new mechanism? Yes, staying within the context of DFS could be simpler.
Note however that we have these requirements: 1. archive files are sometimes used by non-Java non-Hadoop MapReduce programs (using http://wiki.apache.org/lucene- hadoop/HadoopStreaming) 2. avoid repetitive expansion of the job jar and of other archives for each Task in the Job. 3. In case of many small files, avoid a per-file overhead for DFS and cache operations. Because of 1. the files must really be native OS files, not DFS files. Today, unarchiving the job jar occurs in Hadoop, not in the MapRed application. But it is not cached. A problematic choice must be made to implement the following.
– Currently without caching: in Hadoop the Task working directory contains the expanded contents of the job jar. Later with caching: the Task working directory contents should be efficiently created from the filecache contents. – So how to synchronize the task working directory job jar data with the file cache? Or how to work around the need to do this? Some options follow, all have problems. Option #3 seems like the simplest to implement, maintain and explain.
Would this blow anything up in nutch? We could of course provide a flag that causes the backwards compatible copy for a release or two if really needed.
Mahadev konar made changes - 06/Jul/06 06:02 AM
Doug Cutting made changes - 03/Aug/06 05:46 PM
I have attached two other files to the patch which are small .jar and .zip files needed for the junit tests.
Caching and job.jars: Two parts to the patch: 1) Unjarring of job.jar tasktracker/jobcache/jod_id/workdir – the dir where the job is unjarred once The current working dir for each task is the workdir. 2) Archiving of files- i) Each job can ask for a set of archives/files to be localized. The api for that is ii) These archives/files should be present in the specified DFS for localizing. iii) Localization happens across jobs. So each cache archive/file has a key and the key is the url of the cache (in case of absolute path its the absolute path) iv) Whenever a job is started, the first tasks for these jobs will localize the archives. if no dfs://hostname:port is specified (eg : setcachefiles(/user/mahadev/test.txt)), in that case it is stored in v) The archives are localized only once and checked for each task if they are fresh and need to be refresed or not. Steps: c) Two jobs can use the same archives in parallel, but if the second job updates the same archive and tries using the updated archive, then it will fail. vi) How to get the localized cache paths viii) Also, caching across tasktracker going up and down is not supported. So a tasktracker would lose all caching information once it goes down. The caching information can be reconstructed when the task tracker comes up but the support is not available in this patch. ix) When are the caches deleted?
Mahadev konar made changes - 23/Aug/06 08:51 AM
Mahadev konar made changes - 23/Aug/06 05:48 PM
The LocalDU class should rather be a method on FileUtils, as should the UnZip utility.
DistributedCache should not be public, nor should the methods you add to TaskTracker. The only public API for this new feature is the two new JobConf methods, right? You are right Doug. The only public api's should be the jobconf methods. I will incorporate your changes and resubmit the patch. Thanks for your comments.
Incorporated doug's suggestions in this new patch.
Mahadev konar made changes - 24/Aug/06 09:06 PM
Jira ate my comments again. Since I didn't realize it for a day, I'll try to reconstruct them.
They were mostly nits: 1. You add 3 new mini-cluster bring up/tear down cycles in the junit tests. It would be faster to use the same cluster with multiple jobs. incorporated most of owen's comments, except that of merging all my junit tests into other junit tests. I merged my local/minimr junit test into TestMiniMRLocalFS. I tried including my caching junit test with DFS and MiniMR in TestMiniMRWIthDFS but that made it longer htan 3 mins. It currently takes arnd 140 seconds. So I have a seperate junit test for DFS and MiniMR.
Mahadev konar made changes - 01/Sep/06 03:33 AM
The indentation of this patch is non-standard. Please use 2 spaces per indent level, no tabs.
Should the JobConf setters be adders? For example, should setCacheFiles(String) instead be named addCacheFile(Path)? Also, should we use paths instead of strings? indented the patch to make it 2 spaces.
Mahadev konar made changes - 05/Sep/06 07:38 PM
Doug Cutting made changes - 05/Sep/06 08:41 PM
about setcacheFiles(), we could add a addcacheFIles though I have not done it in the patch. Also, we are using URI's (dfs://hostname:port/path) so I dont think we should be using paths instead of strings.
If we are using URIs then shouldn't the parameters be java.net.URI?
sorry to have caused confusion. The files are specified as dfs://hostname:port/pathtofile. These are later converted to URI's by the framework to get the dfs and absolute paths. This is all similar to distcp where the input and outfiles can be specified as dfs://......
The documentation of the primary user API should fully describe the format of the parameters, e.g., that these are strings representing URIs. This would be simpler if the API used a type other than String.
We need an addCacheFile() method before we need a setCacheFiles() method, not the other way around. I'd like to see this as independent of the mapred core as is possible, in order to support things like HADOOP-452. In particular, the DistributedCache class should probably move to a separate package (filecache)?, and as much of the functionality as is possible should be put in that package. The TaskTracker and JobTracker should become clients of this facility. Utilities to store a set of files to cache in a Configuration should arguably move to the new package as well. There's a temptation to overload JobConf that we need to resist. So I'd rather see these as static methods like DistributedCache.addFile(Configuration, URI).
Doug Cutting made changes - 07/Sep/06 07:30 PM
ok, this is what I plan to do. Doug, please comment if you are ok with this – 1) DistributedCache class will be seperate package - org.apache.hadoop.filecache; 2) To make it idependent of mapreduce – The difference between these two methods being that localizeCacheArchives automatically unarchives a zip/jar file while localizeCacheFiles just copies the file locally. 3) DistributedCache maintains a list of localized files/caches so as to copy only once. 4) The TaskRunner is client of DistributedCache asking to cache files locally. 5) JobConf does not have any api's related to setCache/addCache. These will be seperate static methods in DistributedCache to setCaches/files in jobconf. The getCaches can also be impelmented as a static method in DistributedCache. Comments? Made the suggested changes by doug. I moved all the public methods in jobconf for caching into DistributedCache. Mapreduce is now a client of DistributedCache class. Also, the public api now uses URI to addcachefiles/archives. DistributedCache is seperate package.
So the api now looks like and to get the localized Paths
Mahadev konar made changes - 13/Sep/06 09:18 PM
I just committed this. Thanks, Mahadev!
Doug Cutting made changes - 14/Sep/06 10:11 PM
Doug Cutting made changes - 06/Oct/06 09:48 PM
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updated description with more details.
----------
Efficient file caching
(on Hadoop Task nodes, for benefit of MapReduce Tasks)
Overview
========
This document describes a mechanism for caching files or archives on taskTracker nodes' local file system.
Exporting and extracting large archives from HDFS to local filesystem is expensive.
And is often required by the applications.
Currently this would happen at the beginning of every Task of a MapReduce Job.
The purpose of the Hadoop file cache is to minimize this overhead by
preserving and reusing the extracted data
During a MapRed job there are two kinds of data being uploaded to a Hadoop cluster:
Java code and Out-of-band data.
Java code may include libraries so this can easily get large. (megabytes)
Out-of-band data is any data used by the job, in addition to the map input or the reduce input.
For example a large dictionary of words. This can also get large (gigabytes)
There are two main kinds of cacheable files:
1. The MapReduce job jar.
This contains Java code and possibly out-of-band data.
2. Additional archives
This contains out-of-band data.
The proposed solution suggests that
Cacheable files:
are stored in HDFS, and specified in the JobConf of a MapReduce job.
A special case is the job jar file, which will get cached by default.
Supported formats for cacheable files are jar, tar and gzip,
Additional formats could be added at a future time.
Regular files are also supported
Workflow:
========
For out-of-band data, the user must first explicitly upload archives to HDFS.
This can be done using any HDFS client.
It is typical for out-of-band data to be reused across Jobs and users.
The user specifies the out-of-band data using:
JobConf.addCachedArchive() or JobConf.addCachedFile()
The user specifies the job jar as today:
JobConf.setJar("f.jar") which implicitly has the effect of:
JobConf.addCachedArchive("f.jar").
When a Job starts, the JobTracker does the following for each cached archive.
Compute a strong hash of the archive and store the hash in the HDFS.
To avoid reading and scanning the archive, the strong hash is based
on the existing HDFS block-CRC codes rather than on the actual content.
When a Task starts, the TaskTracker does the following for each cached archive.
Retrieve the strong hash from HDFS, compare with the hash of the local copy.
If the local hash does not exist or is different, then
retrieve the archive, unarchive it, update the local hash.
If the archive is the job jar, then
copy or hard-link the archive contents to the Task working directory.
Then start the TaskRunner as usual.
Once the Task is running, the user code may access the cached archive contents.
This usually happens at initialization time.
If the JobConf added the cached archive: /hdfsdir/path/f.jar
Then the task can expect to access the archive content at:
$HADOOP_CACHE/hdfsdir/path/fdir/ffile
or maybe:
$HADOOP_CACHE/hdfsdir/path/f_jar/fdir/ffile
The second option guarantees that multiple archives
in the same directory will not clobber each other.
The translation of f.jar to f_jar is a convention to ease the distinction of file names and directory names.
Note that in the above, the HDFS paths are mirrored on the local filesystem.
The intent is to provide namespace protection.
[i.e. the contents of hdfs1/archive.jar and hdfs2/archive.jar should not collide in the cache]
The intent is not to make cache paths interchangeable with HDFS paths.
The variable HADOOP_CACHE is made available to the task as
a JobConf property that is dynamically set by the TaskRunner code.
Cache size control:
------------------
We cannot let the cache grow unbounded.
The cache is always up-to-date at the start of a job.
So the configurable parameter should not be the age of the cached data
but the total size of the cache.
The cache size is a static TaskTracker configuration parameter.
LRU (least recently used) policy:
On each Task tracker, the cache manager will measure the total size of the cache
and expire the oldest cached items.
When a cached item is requested again in a different job, it goes back to the top.
The cached archive contents are required for the MapReduce task to function.
So when the promised cache contents cannot be provided,
the cache manager will force a job failure
Before new files are added to the cache, we do this size test.
If the cache size limit WOULD require to expire files...
1. .. expire files for completed jobs then everything is fine: delete them.
2. .. expire files for jobs that are already running, then the NEW job fails.
3. .. expire files for the new job then the new job fails.
Note that a file (archive) may belong to multiple jobs.
In normal use the cache size is expected to be significantly larger
than the files requested by a single job.
So the failure modes due to cache overflow should rarely occur.
THE END.