Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-1901

Jobs should not submit the same jar files over and over again

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Currently each Hadoop job uploads the required resources (jars/files/archives) to a new location in HDFS. Map-reduce nodes involved in executing this job would then download these resources into local disk.

      In an environment where most of the users are using a standard set of jars and files (because they are using a framework like Hive/Pig) - the same jars keep getting uploaded and downloaded repeatedly. The overhead of this protocol (primarily in terms of end-user latency) is significant when:

      • the jobs are small (and conversantly - large in number)
      • Namenode is under load (meaning hdfs latencies are high and made worse, in part, by this protocol)

      Hadoop should provide a way for jobs in a cooperative environment to not submit the same files over and again. Identifying and caching execution resources by a content signature (md5/sha) would be a good alternative to have available.

      1. 1901.PATCH
        55 kB
        Junjie Liang
      2. 1901.PATCH
        65 kB
        Junjie Liang

        Issue Links

          Activity

          Hide
          Junjie Liang added a comment -

          This patch depends on HADOOP-7022 for a small tweak to the MD5Hash function, where we keep track of the file size when we calculate the hash of a file. It is also a (combied) fix for MAPREDUCE-1902.

          Show
          Junjie Liang added a comment - This patch depends on HADOOP-7022 for a small tweak to the MD5Hash function, where we keep track of the file size when we calculate the hash of a file. It is also a (combied) fix for MAPREDUCE-1902 .
          Hide
          M. C. Srivas added a comment -

          >[ from dhruba ]
          > It means that a central authority stores the mapping of active callbacks and their associated clients (and files). if a client dies prematurely, the central authority should >have the option to recover that callback and hand it over to a newly requesting client. are you proposing that the NN and/or JT be this central authority?

          Well, the mtime is a poor-man's version number that is getting checked on every access to see if the file at the server is newer. Adding a callback should reduce this load significantly.

          To the point of the question, yes, the NN should be able to revoke the callback whenever it feels like, at which point the client should get it back before reusing items in its cache. The client, on reboot (of itself or of the NN), must re-establish the callbacks it cares about. Note that the callback is not a lock, but a notification mechanism – many clients can hold callbacks on the same file – so it is not necessary for the NN to revoke a callback from one client in order to hand out a callback for the same file to another client. When a file changes, all outstanding callbacks for it are revoked so clients can discard/refresh their caches.

          But the above is moot. Why does a "bulk-mtime" not work, esp given the manner the "bulk-get-md5-signatures" is supposed to work as in Joydeep's proposal? They seem to be equally onerous (or not).

          Show
          M. C. Srivas added a comment - >[ from dhruba ] > It means that a central authority stores the mapping of active callbacks and their associated clients (and files). if a client dies prematurely, the central authority should >have the option to recover that callback and hand it over to a newly requesting client. are you proposing that the NN and/or JT be this central authority? Well, the mtime is a poor-man's version number that is getting checked on every access to see if the file at the server is newer. Adding a callback should reduce this load significantly. To the point of the question, yes, the NN should be able to revoke the callback whenever it feels like, at which point the client should get it back before reusing items in its cache. The client, on reboot (of itself or of the NN), must re-establish the callbacks it cares about. Note that the callback is not a lock, but a notification mechanism – many clients can hold callbacks on the same file – so it is not necessary for the NN to revoke a callback from one client in order to hand out a callback for the same file to another client. When a file changes, all outstanding callbacks for it are revoked so clients can discard/refresh their caches. But the above is moot. Why does a "bulk-mtime" not work, esp given the manner the "bulk-get-md5-signatures" is supposed to work as in Joydeep's proposal? They seem to be equally onerous (or not).
          Hide
          Koji Noguchi added a comment -

          The TaskTracker, on being requested to run a task requiring CAR resource md5_F checks whether md5_F is localized.

          • If md5_F is already localized - then nothing more needs to be done. the localized version is used by the Task
          • If md5_F is not localized - then its fetched from the CAR repository

          What are we gaining by using md5_F on the TaskTracker side?
          Can we use the existing 'cacheStatus.mtime == confFileStamp' check and change the order of the check so that no unnecessary getFileStatus call is made (MAPRED-2011)?
          Otherwise, this can only be used for dist files loaded by this framework and would require two separate logic on the TaskTracker side.

          Show
          Koji Noguchi added a comment - The TaskTracker, on being requested to run a task requiring CAR resource md5_F checks whether md5_F is localized. If md5_F is already localized - then nothing more needs to be done. the localized version is used by the Task If md5_F is not localized - then its fetched from the CAR repository What are we gaining by using md5_F on the TaskTracker side? Can we use the existing 'cacheStatus.mtime == confFileStamp' check and change the order of the check so that no unnecessary getFileStatus call is made (MAPRED-2011)? Otherwise, this can only be used for dist files loaded by this framework and would require two separate logic on the TaskTracker side.
          Hide
          dhruba borthakur added a comment -

          > use a model like AFS with callbacks to implement a on-disk cache

          It means that a central authority stores the mapping of active callbacks and their associated clients (and files). if a client dies prematurely, the central authority should have the option to recover that callback and hand it over to a newly requesting client. are you proposing that the NN and/or JT be this central authority?

          Show
          dhruba borthakur added a comment - > use a model like AFS with callbacks to implement a on-disk cache It means that a central authority stores the mapping of active callbacks and their associated clients (and files). if a client dies prematurely, the central authority should have the option to recover that callback and hand it over to a newly requesting client. are you proposing that the NN and/or JT be this central authority?
          Hide
          M. C. Srivas added a comment -

          Content-addressable is one way to solve this problem, and it seems like an extremely heavy-weight approach
          1. more processing to do whenever a file is added to the file-system
          2. reliability issues getting the signature to match the contents across failures/re-replication/etc
          3. a repository of signatures in HDFS is yet another single-point of failure, and yet another database that needs to be maintained (recovery code to handle "no-data-corruption" on a reboot, scaling it as more files added, backup/restore, HA, etc)

          Looks like there are a variety simpler approaches possible, a few of which come to mind immediately, and are list below in increasing order of complexity.

          1. use distcp or something similar to copy the files onto local disk whenever there is a new version of Hive released , and set pathnames to that. That is, different versions of a set of files are kept in a different directory, and pathnames are used to distinguish them. For example, we do not do a md5 check of "/bin/ls" every time we need to run it. We set our pathname appropriately. If there is a different version of "ls" we prefer to use, say, in "/my/local/bin", then we get that by setting /my/local/bin ahead of other paths in our pathname.

          2. instead of implementing a bulk "getSignatures" call to replace several "get_mtime" calls, why not implement a bulk get_mtime instead?

          3. use a model like AFS with callbacks to implement a on-disk cache that survives reboots (Dhruba knows AFS very well). In other words, the client acquires a callback from the name-node for each file it has cached, and HDFS gurantees it will notify the client when the file is deleted or changed (at which point, the callback is revoked and the client must re-fetch the file). The callback lasts for, say, 1 week, and can be persisted on disk. On a name-node reboot, the client is responsible for re-establishing the callbacks it already has (akin to a block-report). The client can also choose to return callbacks, in order to keep the memory requirements on the name-node to a minimum. No repository of signatures is needed.

          Show
          M. C. Srivas added a comment - Content-addressable is one way to solve this problem, and it seems like an extremely heavy-weight approach 1. more processing to do whenever a file is added to the file-system 2. reliability issues getting the signature to match the contents across failures/re-replication/etc 3. a repository of signatures in HDFS is yet another single-point of failure, and yet another database that needs to be maintained (recovery code to handle "no-data-corruption" on a reboot, scaling it as more files added, backup/restore, HA, etc) Looks like there are a variety simpler approaches possible, a few of which come to mind immediately, and are list below in increasing order of complexity. 1. use distcp or something similar to copy the files onto local disk whenever there is a new version of Hive released , and set pathnames to that. That is, different versions of a set of files are kept in a different directory, and pathnames are used to distinguish them. For example, we do not do a md5 check of "/bin/ls" every time we need to run it. We set our pathname appropriately. If there is a different version of "ls" we prefer to use, say, in "/my/local/bin", then we get that by setting /my/local/bin ahead of other paths in our pathname. 2. instead of implementing a bulk "getSignatures" call to replace several "get_mtime" calls, why not implement a bulk get_mtime instead? 3. use a model like AFS with callbacks to implement a on-disk cache that survives reboots (Dhruba knows AFS very well). In other words, the client acquires a callback from the name-node for each file it has cached, and HDFS gurantees it will notify the client when the file is deleted or changed (at which point, the callback is revoked and the client must re-fetch the file). The callback lasts for, say, 1 week, and can be persisted on disk. On a name-node reboot, the client is responsible for re-establishing the callbacks it already has (akin to a block-report). The client can also choose to return callbacks, in order to keep the memory requirements on the name-node to a minimum. No repository of signatures is needed.
          Hide
          Joydeep Sen Sarma added a comment -

          the proposal is below - rephrases some of the discussions above, addresses some of the comments around race conditions and points out limitations. Junjie will post a patch tomorrow (which probably needs some more work).

          Background

          Hadoop map-reduce jobs commonly require jars, executables, archives and other resources for task execution on hadoop cluster nodes. A common deployment pattern for Hadoop applications is that the required resources are deployed centrally by administrators (either on a shared file system or deployed on standard local file system paths by package management tools). Users launch Hadoop jobs from these installation points. Applications use apis (-libjars/files/archives) provided by Hadoop to upload resources (from the installation point) so that they are made available for task execution. This behavior makes deployment of Hadoop applications very easy (just use standard package management tools).

          As an example, Facebook has a few different Hive installations (of different versions) deployed on NFS filer. Each has a multitude of jar files - with only some differing across different Hive versions. Users also maintain a repository of map-reduce scripts and jar files contain Hive extensions (user defined functions) on a NFS filer. Any installation of Hive can be used to execute jobs against any of multiple map-reduce clusters. Most of the jar files are also required locally (by the Hive client) - either for query compilation or for local execution (either in hadoop local mode or for some special types of queries).

          Problems

          With the above arrangement - each (non local-mode) Hadoop job will upload all the required jar files into HDFS. TaskTrackers will download these jars from HDFS (at most once per job) and check modification times of downloaded files (second task onwards) The following overheads are observed:

          • Job submission latency is impacted because of the need to serially upload multiple jar files into HDFS. At Facebook - we typically see 5-6 seconds of pause in this stage (depends on how responsive DFS is on a given day)
          • There is some latency in setting up the first task as resources must be downloaded from HDFS. We have typically observed this to be around 2-3 seconds at Facebook.
          • For subsequent tasks - the latency impact is not as high - but the mtime check adds to general Namenode pressure.

          Observations

          • jars and other resources are shared across different jobs and users. there are, in fact, hardly any resources that are not shared.
          • these resources are meant to be immutable

          We would like to use these properties to solve some of the overheads in the current protocol while retaining the simplicity of the deployment model that exists today.

          General Approach

          We would like to introduce (for lack of a better term) the notion of Content Addressible Resources (CAR) that are stored in a central repository in HDFS:

          1. CAR jars/files/archives are be identified by their content (for example - named using their md5 checksum).

            This allows different jobs to share resources. Each Job can find out whether the resources required by it are already available in HDFS (by comparing the md5 signatures of their resources against the contents in the CAR repository).

          2. Content Addressible resources (once uploaded) are immutable. They can only be garbage collected (by server side daemons).
            This allows TaskTrackers to skip mtime checks on such resources.

          The CAR functionality is exposed to clients in two ways:

          • a boolean configuration option (defaulting to false) to indicate that resources added via -libjars/files/archives options are content addressible
          • enhancing the Distributed Cache api to mark specific files/archives as CAR (similar to how specific files can be marked public)

          Protocol

          Assume a jobclient has a CAR file F on local disk to be uploaded for task execution. Here's approximately a trace of what happens from the beginning of the job to it's end:

          1. Client computes the md5 signature of F (= md5_F)
            • One can additionally provide an option to skip this step - the md5 can be precomputed and stored in a file named F.md5 stored alongside F. The client can look for and use the contents of this file as the md5 sum.

          2. The client fetches (in a single filesystem call) the list of md5 signatures (and their 'atime' attribute among other things) of the CAR repository

          3. If the md5_F already exists in the CAR repository - then the client simply uses the URI of the existing copy as the resource to be downloaded on the TaskTrackers
            • If the atime of md5_F is older than 1 day, then the client updates the atime (See #6)

          4. If md5_F does not exist in the CAR repository then Client uploads it to the CAR repository using md5_F as the name

          5. The TaskTracker, on being requested to run a task requiring CAR resource md5_F checks whether md5_F is localized.
            • If md5_F is already localized - then nothing more needs to be done. the localized version is used by the Task
            • If md5_F is not localized - then its fetched from the CAR repository

          6. A garbage collector (running on the server side - preferably the JT) scans the CAR repository periodically looking for and deleting resources whose atime is older than N days. This is similar to the TrashEmptier in the Namenode.

          7. The number N is configurable. The protocol guarantees that no job less than N-1 days in length will have it's resources garbage collected before it finishes (because of the update atime step in #3). In practice, the total size of the CAR repository is likely to be very small (relative to other contents in HDFS) and N can be set to a very high number.

          In this protocol - assuming that most jobs are using the same resources - the vast majority of job submissions make only one file system call (to list the CAR repository on the job client). Most task executions do not require any calls to the file system (for purposes of localization). Note that uploads to the CAR repository will also be rare (in steady state).

          Notes

          1. The garbage collection of localized resources on TaskTrackers happens the same as today (for resource downloaded via distributed cache). In particular, no synchronization is required between garbage collection of localized resources and those of the backing URIs in hdfs.

          2. In step #4 - in the v1 implementation, the client is responsible for computing the md5. If the client is malicious - it can spoof the md5 (of important jars) and upload malicious code thereby affecting the execution of other clients.

          3. In the v1 implementation - the CAR repository is implemented as a fixed directory in HDFS. The clients must have write permission to the CAR directory (to upload new resources into it). A malicious client can then delete or modify resources before they are eligible for garbage collection - potentially affecting running jobs.

          The latter two issues can be solved by having a server side agent control the addition and deletion of resources to the CAR repository. However this has not been implemented in v1. The initial implementation only suffices for environments that can make the assumption of non-malicious clients - but can be extended to cover more security conscious use cases in the future (with the attendant burden of more server side apis).

          Show
          Joydeep Sen Sarma added a comment - the proposal is below - rephrases some of the discussions above, addresses some of the comments around race conditions and points out limitations. Junjie will post a patch tomorrow (which probably needs some more work). Background Hadoop map-reduce jobs commonly require jars, executables, archives and other resources for task execution on hadoop cluster nodes. A common deployment pattern for Hadoop applications is that the required resources are deployed centrally by administrators (either on a shared file system or deployed on standard local file system paths by package management tools). Users launch Hadoop jobs from these installation points. Applications use apis (-libjars/files/archives) provided by Hadoop to upload resources (from the installation point) so that they are made available for task execution. This behavior makes deployment of Hadoop applications very easy (just use standard package management tools). As an example, Facebook has a few different Hive installations (of different versions) deployed on NFS filer. Each has a multitude of jar files - with only some differing across different Hive versions. Users also maintain a repository of map-reduce scripts and jar files contain Hive extensions (user defined functions) on a NFS filer. Any installation of Hive can be used to execute jobs against any of multiple map-reduce clusters. Most of the jar files are also required locally (by the Hive client) - either for query compilation or for local execution (either in hadoop local mode or for some special types of queries). Problems With the above arrangement - each (non local-mode) Hadoop job will upload all the required jar files into HDFS. TaskTrackers will download these jars from HDFS (at most once per job) and check modification times of downloaded files (second task onwards) The following overheads are observed: Job submission latency is impacted because of the need to serially upload multiple jar files into HDFS. At Facebook - we typically see 5-6 seconds of pause in this stage (depends on how responsive DFS is on a given day) There is some latency in setting up the first task as resources must be downloaded from HDFS. We have typically observed this to be around 2-3 seconds at Facebook. For subsequent tasks - the latency impact is not as high - but the mtime check adds to general Namenode pressure. Observations jars and other resources are shared across different jobs and users. there are, in fact, hardly any resources that are not shared. these resources are meant to be immutable We would like to use these properties to solve some of the overheads in the current protocol while retaining the simplicity of the deployment model that exists today. General Approach We would like to introduce (for lack of a better term) the notion of Content Addressible Resources (CAR) that are stored in a central repository in HDFS: CAR jars/files/archives are be identified by their content (for example - named using their md5 checksum). This allows different jobs to share resources. Each Job can find out whether the resources required by it are already available in HDFS (by comparing the md5 signatures of their resources against the contents in the CAR repository). Content Addressible resources (once uploaded) are immutable. They can only be garbage collected (by server side daemons). This allows TaskTrackers to skip mtime checks on such resources. The CAR functionality is exposed to clients in two ways: a boolean configuration option (defaulting to false) to indicate that resources added via -libjars/files/archives options are content addressible enhancing the Distributed Cache api to mark specific files/archives as CAR (similar to how specific files can be marked public) Protocol Assume a jobclient has a CAR file F on local disk to be uploaded for task execution. Here's approximately a trace of what happens from the beginning of the job to it's end: Client computes the md5 signature of F (= md5_F ) One can additionally provide an option to skip this step - the md5 can be precomputed and stored in a file named F.md5 stored alongside F . The client can look for and use the contents of this file as the md5 sum. The client fetches (in a single filesystem call) the list of md5 signatures (and their 'atime' attribute among other things) of the CAR repository If the md5_F already exists in the CAR repository - then the client simply uses the URI of the existing copy as the resource to be downloaded on the TaskTrackers If the atime of md5_F is older than 1 day, then the client updates the atime (See #6) If md5_F does not exist in the CAR repository then Client uploads it to the CAR repository using md5_F as the name The TaskTracker, on being requested to run a task requiring CAR resource md5_F checks whether md5_F is localized. If md5_F is already localized - then nothing more needs to be done. the localized version is used by the Task If md5_F is not localized - then its fetched from the CAR repository A garbage collector (running on the server side - preferably the JT) scans the CAR repository periodically looking for and deleting resources whose atime is older than N days. This is similar to the TrashEmptier in the Namenode. The number N is configurable. The protocol guarantees that no job less than N-1 days in length will have it's resources garbage collected before it finishes (because of the update atime step in #3). In practice, the total size of the CAR repository is likely to be very small (relative to other contents in HDFS) and N can be set to a very high number. In this protocol - assuming that most jobs are using the same resources - the vast majority of job submissions make only one file system call (to list the CAR repository on the job client). Most task executions do not require any calls to the file system (for purposes of localization). Note that uploads to the CAR repository will also be rare (in steady state). Notes The garbage collection of localized resources on TaskTrackers happens the same as today (for resource downloaded via distributed cache). In particular, no synchronization is required between garbage collection of localized resources and those of the backing URIs in hdfs. In step #4 - in the v1 implementation, the client is responsible for computing the md5. If the client is malicious - it can spoof the md5 (of important jars) and upload malicious code thereby affecting the execution of other clients. In the v1 implementation - the CAR repository is implemented as a fixed directory in HDFS. The clients must have write permission to the CAR directory (to upload new resources into it). A malicious client can then delete or modify resources before they are eligible for garbage collection - potentially affecting running jobs. The latter two issues can be solved by having a server side agent control the addition and deletion of resources to the CAR repository. However this has not been implemented in v1. The initial implementation only suffices for environments that can make the assumption of non-malicious clients - but can be extended to cover more security conscious use cases in the future (with the attendant burden of more server side apis).
          Hide
          Koji Noguchi added a comment -

          For me, that's not of a worry. It may delay individual job submissions, but the overall load to the hdfs isn't much.

          (at least compared to later phase of hundreds and thousands of tasktrackers looking up mtime of 'all those jars'.)

          Since my problem is just about lookup of mtime, created a new jira MAPREDUCE-2011.

          Show
          Koji Noguchi added a comment - For me, that's not of a worry. It may delay individual job submissions, but the overall load to the hdfs isn't much. (at least compared to later phase of hundreds and thousands of tasktrackers looking up mtime of 'all those jars'.) Since my problem is just about lookup of mtime, created a new jira MAPREDUCE-2011 .
          Hide
          Joydeep Sen Sarma added a comment -

          > if the jar changed resulting in different tasks of the same job getting different data rendering debugging impossible!

          prior to security related functionality (and even with it if the user is capricious enough) - i think this was very much possible. the only contract that the TT's seem to follow today is that the launched task sees the latest possible version of a required resource (jar/file etc.). there's no contract that says that each task sees identical version of each resource. the mtime check only seems to reinforce this notion - that resources could be changing underneath as the job is running.

          that said - there will likely be security holes in our current scheme. we will post a spec and trunk patch by sometime later today (or weekend) hopefully.

          Show
          Joydeep Sen Sarma added a comment - > if the jar changed resulting in different tasks of the same job getting different data rendering debugging impossible! prior to security related functionality (and even with it if the user is capricious enough) - i think this was very much possible. the only contract that the TT's seem to follow today is that the launched task sees the latest possible version of a required resource (jar/file etc.). there's no contract that says that each task sees identical version of each resource. the mtime check only seems to reinforce this notion - that resources could be changing underneath as the job is running. that said - there will likely be security holes in our current scheme. we will post a spec and trunk patch by sometime later today (or weekend) hopefully.
          Hide
          Arun C Murthy added a comment -

          sorry for the endless confusion - i will try to write up a detailed doc tomorrow covering use cases and design/gaps etc.

          Can you please attach one?

          the changes to distributed cache (of which there are little - i think most changes are in jobclient and taskrunner) are concerned with making the assumption that the shared objects are immutable (in which case mtime checks can be bypassed).

          This seems just wrong - assuming immutability is just not useful - the cost of the mtime is trivial compared to doing the I/O which we are anyway saving. It will essentially introduce randomness if the jar changed resulting in different tasks of the same job getting different data rendering debugging impossible!

          -1

          Show
          Arun C Murthy added a comment - sorry for the endless confusion - i will try to write up a detailed doc tomorrow covering use cases and design/gaps etc. Can you please attach one? the changes to distributed cache (of which there are little - i think most changes are in jobclient and taskrunner) are concerned with making the assumption that the shared objects are immutable (in which case mtime checks can be bypassed). This seems just wrong - assuming immutability is just not useful - the cost of the mtime is trivial compared to doing the I/O which we are anyway saving. It will essentially introduce randomness if the jar changed resulting in different tasks of the same job getting different data rendering debugging impossible! -1
          Hide
          Koji Noguchi added a comment -

          u would get a trace of all those jars being uploaded to hdfs. it's ridiculous.

          For me, that's not of a worry. It may delay individual job submissions, but the overall load to the hdfs isn't much.
          (at least compared to later phase of hundreds and thousands of tasktrackers looking up mtime of 'all those jars'.)

          Show
          Koji Noguchi added a comment - u would get a trace of all those jars being uploaded to hdfs. it's ridiculous. For me, that's not of a worry. It may delay individual job submissions, but the overall load to the hdfs isn't much. (at least compared to later phase of hundreds and thousands of tasktrackers looking up mtime of 'all those jars'.)
          Hide
          Joydeep Sen Sarma added a comment -

          it's almost in production - the patch posted here had some bugs. we have final patch for 20 available - but have been trying to get the one for trunk into shape. should post soon.

          try running hadoop in debug log4j mode. u would get a trace of all those jars being uploaded to hdfs. it's ridiculous.

          Show
          Joydeep Sen Sarma added a comment - it's almost in production - the patch posted here had some bugs. we have final patch for 20 available - but have been trying to get the one for trunk into shape. should post soon. try running hadoop in debug log4j mode. u would get a trace of all those jars being uploaded to hdfs. it's ridiculous.
          Hide
          Koji Noguchi added a comment -

          we have started testing this patch internally and this would become production in a couple of weeks.

          Joydeep, is this being tested on your production? How does the load look like?
          I don't know the details, but I like the "part of the goal here is to not have to look up mtimes again and again. " part.

          We certainly have applications with many small tasks having multiple libjar/distributed-caches resulting with too many getfileinfo calls to the namenode.

          Show
          Koji Noguchi added a comment - we have started testing this patch internally and this would become production in a couple of weeks. Joydeep, is this being tested on your production? How does the load look like? I don't know the details, but I like the "part of the goal here is to not have to look up mtimes again and again. " part. We certainly have applications with many small tasks having multiple libjar/distributed-caches resulting with too many getfileinfo calls to the namenode.
          Hide
          Joydeep Sen Sarma added a comment -

          sorry for the endless confusion - i will try to write up a detailed doc tomorrow covering use cases and design/gaps etc.

          the use case involves libjars being added from local file systems (since that's where software packages are deployed). it's really not possible to deploy software packages on hdfs (in certain cases - we wish to execute the software locally without interacting with hdfs entirely (see for example HIVE-1408)).

          the changes to distributed cache (of which there are little - i think most changes are in jobclient and taskrunner) are concerned with making the assumption that the shared objects are immutable (in which case mtime checks can be bypassed).

          Show
          Joydeep Sen Sarma added a comment - sorry for the endless confusion - i will try to write up a detailed doc tomorrow covering use cases and design/gaps etc. the use case involves libjars being added from local file systems (since that's where software packages are deployed). it's really not possible to deploy software packages on hdfs (in certain cases - we wish to execute the software locally without interacting with hdfs entirely (see for example HIVE-1408 )). the changes to distributed cache (of which there are little - i think most changes are in jobclient and taskrunner) are concerned with making the assumption that the shared objects are immutable (in which case mtime checks can be bypassed).
          Hide
          Amareshwari Sriramadasu added a comment -

          Currently, files loaded through hadoop libjars/files/archives mechanism are copied onto HDFS and removed on every job.

          JobClient copies the files/archives passed through -files, -libjars ad -archives options if they are in local file system. If they are already in HDFS, no copying is done. Then, the hdfs paths are added to distributedCache. Thus, Arun pointed in earlier comment, DistributedCache does not copy any files to dfs.

          I think this issue would involve picking up job jar from a dfs path directly instead of local fs.

          Show
          Amareshwari Sriramadasu added a comment - Currently, files loaded through hadoop libjars/files/archives mechanism are copied onto HDFS and removed on every job. JobClient copies the files/archives passed through -files, -libjars ad -archives options if they are in local file system. If they are already in HDFS, no copying is done. Then, the hdfs paths are added to distributedCache. Thus, Arun pointed in earlier comment, DistributedCache does not copy any files to dfs. I think this issue would involve picking up job jar from a dfs path directly instead of local fs.
          Hide
          Junjie Liang added a comment -

          To supplement Joydeep's comment:

          We are trying to save the number of calls to the NameNode, through the following optimizations:

          1) Currently, files loaded through hadoop libjars/files/archives mechanism are copied onto HDFS and removed on every job. This is inefficient if most jobs are submitted from only 3-4 versions of hive, because rightfully the files should persist in HDFS to be reused. Hence the idea of decoupling files with their jobId to make them sharable across jobs.

          2) If files are identified with their md5 checksums, we no longer need to verify file modification time in the TT. This saves another call to the NameNode to get the FileStatus object.

          The reduction in the number of calls to the NameNode is small, but over a large number of jobs we believe it will be a noticeable difference.

          Show
          Junjie Liang added a comment - To supplement Joydeep's comment: We are trying to save the number of calls to the NameNode, through the following optimizations: 1) Currently, files loaded through hadoop libjars/files/archives mechanism are copied onto HDFS and removed on every job. This is inefficient if most jobs are submitted from only 3-4 versions of hive, because rightfully the files should persist in HDFS to be reused. Hence the idea of decoupling files with their jobId to make them sharable across jobs. 2) If files are identified with their md5 checksums, we no longer need to verify file modification time in the TT. This saves another call to the NameNode to get the FileStatus object. The reduction in the number of calls to the NameNode is small, but over a large number of jobs we believe it will be a noticeable difference.
          Hide
          Joydeep Sen Sarma added a comment -

          > The DistributedCache already tracks mtimes for files

          ummmm - that's what i am saying. if u consider objects as immutable - then u don't have to track and look up mtimes. part of the goal here is to not have to look up mtimes again and again. if u have an object with matching md5 localized - you are done. (but we can't use the names alone for that. names can collide. md5 cannot (or nearly so). so we name objects based on their content signature (md5) - which is what a content addressible store/cache does).

          > Admin installs pig/hive on hdfs:
          > /share/hive/v1/hive.jar
          > /share/hive/v2/hive.jar

          that's not how hive works (or how hadoop streaming works). people deploy hive on NFS filers or local disks. users run hive jobs from these installation points. there's no hdfs involvement anywhere. people add jars to hive or hadoop streaming from their personal or shared folders. when people run hive jobs - they are not writing java. there's no .setRemoteJar() code they are writing.

          hive loads the required jars (from the install directory) to hadoop via hadoop libjars/files/archives functionality. different hive clients are not aware of each other (ditto for hadoop streaming). most of the hive clients are running from common install points - but people may be running from personal install points with altered builds.

          with what we have done in this patch - all these uncoordinated clients automatically share jars with each other. because the name for the shared object now is derived from the content of the object. we are still leveraging distributed cache - but we are naming objects based on their contents. Junjie tells me we can leverage the 'shared' objects namespace from trunk (in 20 we added our own shared namespace).

          because the names are based on strong content signature - we can make the assumption of immutability. as i have tried to point out many times - when objects are immutable - one can make optimizations and skip timestamp based validation. the latter requires hdfs lookups and creates load and latency.

          note that we need zero application changes for this sharing and zero admin overhead. so all sorts of hadoop users will automatically start getting the benefit a shared jars without writing any code and without any special admin recipe.

          isn't that good?

          Show
          Joydeep Sen Sarma added a comment - > The DistributedCache already tracks mtimes for files ummmm - that's what i am saying. if u consider objects as immutable - then u don't have to track and look up mtimes. part of the goal here is to not have to look up mtimes again and again. if u have an object with matching md5 localized - you are done. (but we can't use the names alone for that. names can collide. md5 cannot (or nearly so). so we name objects based on their content signature (md5) - which is what a content addressible store/cache does). > Admin installs pig/hive on hdfs: > /share/hive/v1/hive.jar > /share/hive/v2/hive.jar that's not how hive works (or how hadoop streaming works). people deploy hive on NFS filers or local disks. users run hive jobs from these installation points. there's no hdfs involvement anywhere. people add jars to hive or hadoop streaming from their personal or shared folders. when people run hive jobs - they are not writing java. there's no .setRemoteJar() code they are writing. hive loads the required jars (from the install directory) to hadoop via hadoop libjars/files/archives functionality. different hive clients are not aware of each other (ditto for hadoop streaming). most of the hive clients are running from common install points - but people may be running from personal install points with altered builds. with what we have done in this patch - all these uncoordinated clients automatically share jars with each other. because the name for the shared object now is derived from the content of the object. we are still leveraging distributed cache - but we are naming objects based on their contents. Junjie tells me we can leverage the 'shared' objects namespace from trunk (in 20 we added our own shared namespace). because the names are based on strong content signature - we can make the assumption of immutability. as i have tried to point out many times - when objects are immutable - one can make optimizations and skip timestamp based validation. the latter requires hdfs lookups and creates load and latency. note that we need zero application changes for this sharing and zero admin overhead. so all sorts of hadoop users will automatically start getting the benefit a shared jars without writing any code and without any special admin recipe. isn't that good?
          Hide
          Arun C Murthy added a comment -

          Joydeep - Maybe we are talking past each other, yet ...

          The DistributedCache already tracks mtimes for files. Each TT, via the DistributedCache, localizes the file based on <absolute-path-on-hdfs, mtime>.

          This seems sufficient for the use case as I understand it.

          Here is the flow:

          Admin installs pig/hive on hdfs:
          /share/hive/v1/hive.jar
          /share/hive/v2/hive.jar

          The pig/hive framework, in fact, any MR job then does:

          JobConf job = new JobConf();
          job.setRemoteJar(new Path("/share/hive/v1/hive.jar")
          JobConf.submitJob(job);

          That's it. The JobClient has the smarts to use DistributedCache.addArchiveToClassPath as the implementation of JobConf.setRemoteJar.

          If you want a new version of hive.jar, you change hive to use /share/hive/v2/hive.jar.

          What am I missing here?

          Show
          Arun C Murthy added a comment - Joydeep - Maybe we are talking past each other, yet ... The DistributedCache already tracks mtimes for files. Each TT, via the DistributedCache, localizes the file based on <absolute-path-on-hdfs, mtime>. This seems sufficient for the use case as I understand it. Here is the flow: Admin installs pig/hive on hdfs: /share/hive/v1/hive.jar /share/hive/v2/hive.jar The pig/hive framework, in fact, any MR job then does: JobConf job = new JobConf(); job.setRemoteJar(new Path("/share/hive/v1/hive.jar") JobConf.submitJob(job); That's it. The JobClient has the smarts to use DistributedCache.addArchiveToClassPath as the implementation of JobConf.setRemoteJar. If you want a new version of hive.jar, you change hive to use /share/hive/v2/hive.jar. What am I missing here?
          Hide
          Joydeep Sen Sarma added a comment -

          @Arun - you are right - this is a layer above distributed cache for the most part. Take a look at our use case (bottom of my previous comments). Essentially we are extending the Distributed Cache a bit to be a content addressible cache. I do not think our use case is directly supported by Hadoop for this purpose - and we are hoping to make the change in the framework (instead of Hive) because there's nothing Hive specific here and whatever we are doing will be directly leveraged by other apps.

          Sharing != Content addressible. A NFS filer can be globally shared - but it's not content addressible. An EMC Centera (amongst others) is. Sorry - terrible examples - trying to come up with something quickly.

          Will address Vinod's comments later - we have taken race considerations into account.

          Show
          Joydeep Sen Sarma added a comment - @Arun - you are right - this is a layer above distributed cache for the most part. Take a look at our use case (bottom of my previous comments). Essentially we are extending the Distributed Cache a bit to be a content addressible cache. I do not think our use case is directly supported by Hadoop for this purpose - and we are hoping to make the change in the framework (instead of Hive) because there's nothing Hive specific here and whatever we are doing will be directly leveraged by other apps. Sharing != Content addressible. A NFS filer can be globally shared - but it's not content addressible. An EMC Centera (amongst others) is. Sorry - terrible examples - trying to come up with something quickly. Will address Vinod's comments later - we have taken race considerations into account.
          Hide
          Arun C Murthy added a comment -

          To re-terate:
          Pre-security - Artifacts in DistributedCache are already shared across jobs, no changes needed.
          Post-security - MAPREDUCE-774 allows for a shared distributed cache across jobs too.

          Show
          Arun C Murthy added a comment - To re-terate: Pre-security - Artifacts in DistributedCache are already shared across jobs, no changes needed. Post-security - MAPREDUCE-774 allows for a shared distributed cache across jobs too.
          Hide
          Arun C Murthy added a comment -

          I'm proposing a change in the way files are stored in HDFS. Instead of storing files in /jobid/files or /jobid/archives, we store them directly in {mapred.system.dir}/files and {mapred.system.dir}/archives. This removes the association between a file and the job ID, so that files can be persistent across jobs.

          I'm confused here. The distributed-cache does not write any files to HDFS, it merely is configured with a set of files to be copied from HDFS to the compute node. Why are we making these changes?

          Show
          Arun C Murthy added a comment - I'm proposing a change in the way files are stored in HDFS. Instead of storing files in /jobid/files or /jobid/archives, we store them directly in {mapred.system.dir}/files and {mapred.system.dir}/archives. This removes the association between a file and the job ID, so that files can be persistent across jobs. I'm confused here. The distributed-cache does not write any files to HDFS, it merely is configured with a set of files to be copied from HDFS to the compute node. Why are we making these changes?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Currently, auxiliary files added through DistributedCache.addCacheFiles and DistributedCache.addCacheArchive end up in {mapred.system.dir}/job_id/files or {mapred.system.dir}/job_id/archives. The /job_id directory is then removed after every job, which is why files cannot be reused across jobs.

          That is only true for private distributed cache files. Artifacts which are already public on the DFS don't go to mapredsystem directly at all and are reusable across users/jobs.

          2. it treats shared objects as immutable. meaning that we never look up the timestamp of the backing object in hdfs during task localization/validation. this saves time during task setup.

          3. reasonable effort has been put to bypass as many hdfs calls as possible in step 1. the client gets a listing of all shared objects and their md5 signatures in one shot. because of the immutability assumption - individual file stamps are never required and save hdfs calls.

          I think this is an orthogonal. If md5 checksums are preferred over timestamp based checks for the sake of lessening DFS accesses, that can be done separately within the current design, no? Distributed cache files originally did rely on md5 checksum of the files/jars that HDFS itself used to have. However that changed via HADOOP-1084 when checksums paved way for block level crcs.

          4. finally - there is inbuilt code to do garbage collection of the shared namespace (in hdfs) by deleting old shared objects that have not been recently accessed.

          This is where I think it gets tricky. First, garbage collection of the dfs namespace should be accompanied by the same on individual TTs - more complexity.

          There are race conditions too. It's not clear how the JobTracker is prevented from expiring shared cache files/jars when some JobClient has already marked or is in the process of marking those artifacts for usage by the job. Warranting such synchronization across JobTracker and JobClients is difficult and, at best, brittle. Leaving the synchronization issues unsolved would only mean leaving the tasks/job to fail later which is not desirable.

          the difference here is that all applications (like Hive) using libjars etc. options provided in hadoop automatically share jars with each other (when they set this option). the applications don't have to do anything special (like figuring out the right global identifier in hdfs for their jars).

          That seems like a valid use-case. But as I mentioned above, because of complexity and race conditions it seems like a wrong place to develop it.

          I think the core problem is trying to perform a service (sharing of files) that strictly belongs to the layer above mapreduce - maintaining the share list doesn't seem like a JT's responsibility. The current way of leaving it to the users to decide which are public files(and hence shareable) and which are not and how and when they are purged, keeps things saner from the mapreduce framework point of view. What do you think?

          if u can look at the patch a bit - that might help understand the differences as well

          I looked at the patch. And I am still not convinced. Yet, that is.

          Show
          Vinod Kumar Vavilapalli added a comment - Currently, auxiliary files added through DistributedCache.addCacheFiles and DistributedCache.addCacheArchive end up in {mapred.system.dir}/job_id/files or {mapred.system.dir}/job_id/archives. The /job_id directory is then removed after every job, which is why files cannot be reused across jobs. That is only true for private distributed cache files. Artifacts which are already public on the DFS don't go to mapredsystem directly at all and are reusable across users/jobs. 2. it treats shared objects as immutable. meaning that we never look up the timestamp of the backing object in hdfs during task localization/validation. this saves time during task setup. 3. reasonable effort has been put to bypass as many hdfs calls as possible in step 1. the client gets a listing of all shared objects and their md5 signatures in one shot. because of the immutability assumption - individual file stamps are never required and save hdfs calls. I think this is an orthogonal. If md5 checksums are preferred over timestamp based checks for the sake of lessening DFS accesses, that can be done separately within the current design, no? Distributed cache files originally did rely on md5 checksum of the files/jars that HDFS itself used to have. However that changed via HADOOP-1084 when checksums paved way for block level crcs. 4. finally - there is inbuilt code to do garbage collection of the shared namespace (in hdfs) by deleting old shared objects that have not been recently accessed. This is where I think it gets tricky. First, garbage collection of the dfs namespace should be accompanied by the same on individual TTs - more complexity. There are race conditions too. It's not clear how the JobTracker is prevented from expiring shared cache files/jars when some JobClient has already marked or is in the process of marking those artifacts for usage by the job. Warranting such synchronization across JobTracker and JobClients is difficult and, at best, brittle. Leaving the synchronization issues unsolved would only mean leaving the tasks/job to fail later which is not desirable. the difference here is that all applications (like Hive) using libjars etc. options provided in hadoop automatically share jars with each other (when they set this option). the applications don't have to do anything special (like figuring out the right global identifier in hdfs for their jars). That seems like a valid use-case. But as I mentioned above, because of complexity and race conditions it seems like a wrong place to develop it. I think the core problem is trying to perform a service (sharing of files) that strictly belongs to the layer above mapreduce - maintaining the share list doesn't seem like a JT's responsibility. The current way of leaving it to the users to decide which are public files(and hence shareable) and which are not and how and when they are purged, keeps things saner from the mapreduce framework point of view. What do you think? if u can look at the patch a bit - that might help understand the differences as well I looked at the patch. And I am still not convinced. Yet, that is.
          Hide
          Joydeep Sen Sarma added a comment -

          thanks for taking a look. i think there are some differences (and potentially some overlap as well) with what we are trying to do here:

          1. the jobclient in this approach computes md5 of jars/files/archives (when a special option is enabled) and then automatically submits these jars as shared objects by putting them in a global namespace - where the (md5,file-name) identifies the shared object. (instead of the (jobid, file-name, file-timestamp)

          2. it treats shared objects as immutable. meaning that we never look up the timestamp of the backing object in hdfs during task localization/validation. this saves time during task setup.

          3. reasonable effort has been put to bypass as many hdfs calls as possible in step 1. the client gets a listing of all shared objects and their md5 signatures in one shot. because of the immutability assumption - individual file stamps are never required and save hdfs calls.

          4. finally - there is inbuilt code to do garbage collection of the shared namespace (in hdfs) by deleting old shared objects that have not been recently accessed.

          so i believe the scope of this effort is somewhat different (based on looking at the last patch for 744).

          the difference here is that all applications (like Hive) using libjars etc. options provided in hadoop automatically share jars with each other (when they set this option). the applications don't have to do anything special (like figuring out the right global identifier in hdfs for their jars).

          Our primary use case is for Hive. Hive submits multiple jars for each Hadoop job. Users can add more. At any given time - we have at least 4-5 official versions of Hive being used to submit jobs. in addition - hive developers are developing custom builds and submitting jobs using them. total jobs submitted per day is tens of thousands.

          with this patch - we automatically get sharing of jars and zero administration overhead of managing a global namespace amongst many versions of our software libraries. I believe there's nothing Hive specific here. We use hadoop jar/file resources just like hadoop-streaming and other map-reduce jobs.

          before embarking on this venture - we looked at the hadoop code and tried to find out whether a similar facility existed. we noticed a md5 class - but no uses for it. if there is existing functionality to the above effect - we would love to pick it up (less work for us). otherwise - i think this is very useful functionality that would be good to have in Hadoop framework.

          if u can look at the patch a bit - that might help understand the differences as well.

          Show
          Joydeep Sen Sarma added a comment - thanks for taking a look. i think there are some differences (and potentially some overlap as well) with what we are trying to do here: 1. the jobclient in this approach computes md5 of jars/files/archives (when a special option is enabled) and then automatically submits these jars as shared objects by putting them in a global namespace - where the (md5,file-name) identifies the shared object. (instead of the (jobid, file-name, file-timestamp) 2. it treats shared objects as immutable. meaning that we never look up the timestamp of the backing object in hdfs during task localization/validation. this saves time during task setup. 3. reasonable effort has been put to bypass as many hdfs calls as possible in step 1. the client gets a listing of all shared objects and their md5 signatures in one shot. because of the immutability assumption - individual file stamps are never required and save hdfs calls. 4. finally - there is inbuilt code to do garbage collection of the shared namespace (in hdfs) by deleting old shared objects that have not been recently accessed. so i believe the scope of this effort is somewhat different (based on looking at the last patch for 744). the difference here is that all applications (like Hive) using libjars etc. options provided in hadoop automatically share jars with each other (when they set this option). the applications don't have to do anything special (like figuring out the right global identifier in hdfs for their jars). Our primary use case is for Hive. Hive submits multiple jars for each Hadoop job. Users can add more. At any given time - we have at least 4-5 official versions of Hive being used to submit jobs. in addition - hive developers are developing custom builds and submitting jobs using them. total jobs submitted per day is tens of thousands. with this patch - we automatically get sharing of jars and zero administration overhead of managing a global namespace amongst many versions of our software libraries. I believe there's nothing Hive specific here. We use hadoop jar/file resources just like hadoop-streaming and other map-reduce jobs. before embarking on this venture - we looked at the hadoop code and tried to find out whether a similar facility existed. we noticed a md5 class - but no uses for it. if there is existing functionality to the above effect - we would love to pick it up (less work for us). otherwise - i think this is very useful functionality that would be good to have in Hadoop framework. if u can look at the patch a bit - that might help understand the differences as well.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Apologies for not looking at this issue before.

          Distributed cache already has the support for sharing of files/archives via MAPREDUCE-744. Went into 0.21, may be all you need is a back-port.

          The requirements for this issue can be simply met by making the job jar files on dfs as public and adding them to distributed cache as files/archives to be put on the task's classpath. I don't see anything else needed.

          Show
          Vinod Kumar Vavilapalli added a comment - Apologies for not looking at this issue before. Distributed cache already has the support for sharing of files/archives via MAPREDUCE-744 . Went into 0.21, may be all you need is a back-port. The requirements for this issue can be simply met by making the job jar files on dfs as public and adding them to distributed cache as files/archives to be put on the task's classpath. I don't see anything else needed.
          Hide
          Joydeep Sen Sarma added a comment -

          Arun and other hadoop'ers - it might take JJ sometime to get the patch for trunk ready. if u guys have some cycles - it would be good to vet the general approach by looking at the patch for 20. I think the code for trunk differs primarily in security related aspect (from a quick glance).

          we have started testing this patch internally and this would become production in a couple of weeks.

          Show
          Joydeep Sen Sarma added a comment - Arun and other hadoop'ers - it might take JJ sometime to get the patch for trunk ready. if u guys have some cycles - it would be good to vet the general approach by looking at the patch for 20. I think the code for trunk differs primarily in security related aspect (from a quick glance). we have started testing this patch internally and this would become production in a couple of weeks.
          Hide
          Junjie Liang added a comment -

          Patch for version 20.2
          =================

          Set "mapred.cache.shared.enabled" to "true" to enable cache files to be shared across jobs.

          Show
          Junjie Liang added a comment - Patch for version 20.2 ================= Set "mapred.cache.shared.enabled" to "true" to enable cache files to be shared across jobs.
          Hide
          Junjie Liang added a comment -

          Here is some more details on how I intend to make the changes. Please comment and give suggestions as you see fit.

          Currently, auxiliary files added through DistributedCache.addCacheFiles and DistributedCache.addCacheArchive end up in

          {mapred.system.dir}/job_id/files or {mapred.system.dir}

          /job_id/archives. The /job_id directory is then removed after every job, which is why files cannot be reused across jobs.

          I'm proposing a change in the way files are stored in HDFS. Instead of storing files in /jobid/files or /jobid/archives, we store them directly in

          {mapred.system.dir}/files and {mapred.system.dir}

          /archives. This removes the association between a file and the job ID, so that files can be persistent across jobs.

          Two new function calls: DistributedCache.addSharedCacheFiles() and DistributedCache.addSharedCacheArchives() are added for users to add files that can be shared across jobs. Files that are added through the original functions addCacheFiles() and addSharedArchives() are not affected; they go through the same code path as before.

          The "shared" files are stored in

          {mapred.system.dir}/files and {mapred.system.dir}

          /archives (note the job_id is removed from the path). To prevent files with the same filename from colliding, a prefix which is the md5 of the file is added to the filename of each file, so for example, test.txt becomes ab876d86389d76c9e692fffd51bb2acde_test.txt. We use both the md5 checksum and filename to identify a file so there is no confusion between files with the same filename but have different contents, and files with the same contents but with different filenames.

          The TaskRunner no longer needs to use timestamps to decide whether a file is up to date, since the file will have a different md5 checksum if it is modified.

          Files that need to be changed: JobClient.java, DistributedCache.java, and TaskRunner.java have the most changes, since files move from the client to HDFS to the tasktracker nodes through these codes.

          Thanks!

          Show
          Junjie Liang added a comment - Here is some more details on how I intend to make the changes. Please comment and give suggestions as you see fit. Currently, auxiliary files added through DistributedCache.addCacheFiles and DistributedCache.addCacheArchive end up in {mapred.system.dir}/job_id/files or {mapred.system.dir} /job_id/archives. The /job_id directory is then removed after every job, which is why files cannot be reused across jobs. I'm proposing a change in the way files are stored in HDFS. Instead of storing files in /jobid/files or /jobid/archives, we store them directly in {mapred.system.dir}/files and {mapred.system.dir} /archives. This removes the association between a file and the job ID, so that files can be persistent across jobs. Two new function calls: DistributedCache.addSharedCacheFiles() and DistributedCache.addSharedCacheArchives() are added for users to add files that can be shared across jobs. Files that are added through the original functions addCacheFiles() and addSharedArchives() are not affected; they go through the same code path as before. The "shared" files are stored in {mapred.system.dir}/files and {mapred.system.dir} /archives (note the job_id is removed from the path). To prevent files with the same filename from colliding, a prefix which is the md5 of the file is added to the filename of each file, so for example, test.txt becomes ab876d86389d76c9e692fffd51bb2acde_test.txt. We use both the md5 checksum and filename to identify a file so there is no confusion between files with the same filename but have different contents, and files with the same contents but with different filenames. The TaskRunner no longer needs to use timestamps to decide whether a file is up to date, since the file will have a different md5 checksum if it is modified. Files that need to be changed: JobClient.java, DistributedCache.java, and TaskRunner.java have the most changes, since files move from the client to HDFS to the tasktracker nodes through these codes. Thanks!
          Hide
          Joydeep Sen Sarma added a comment -

          certainly true that we could do this at the Hive layer. two issues:

          • not generic (meaning wouldn't work for streaming for example)
          • need to repeat some of the classpath management stuff that Jobclient/TT already take care.

          currently Hive leverages Hadoop provided facilities for distributing jars and files - and we will try to extend this functionality.

          Show
          Joydeep Sen Sarma added a comment - certainly true that we could do this at the Hive layer. two issues: not generic (meaning wouldn't work for streaming for example) need to repeat some of the classpath management stuff that Jobclient/TT already take care. currently Hive leverages Hadoop provided facilities for distributing jars and files - and we will try to extend this functionality.
          Hide
          dhruba borthakur added a comment -

          +1

          from what I have learnt, the files in the distributed cache are persisted even across map-reduce jobs. So, the hive client can upload the relavant jars to some location in hdfs and then point the distributed cache to that hdfs uri(s). If we do that, then the TT will download those hdfs uri(s) to the local disk only once and all tasks (across multiple jobs) on that task tracker will continue to use these jars.

          Show
          dhruba borthakur added a comment - +1 from what I have learnt, the files in the distributed cache are persisted even across map-reduce jobs. So, the hive client can upload the relavant jars to some location in hdfs and then point the distributed cache to that hdfs uri(s). If we do that, then the TT will download those hdfs uri(s) to the local disk only once and all tasks (across multiple jobs) on that task tracker will continue to use these jars.
          Hide
          Joydeep Sen Sarma added a comment -

          yeah - we have an intern (Junjie Liang) working on this and he is reusing the Distributed Cache - should be posting some code/design soon.

          Show
          Joydeep Sen Sarma added a comment - yeah - we have an intern (Junjie Liang) working on this and he is reusing the Distributed Cache - should be posting some code/design soon.
          Hide
          Arun C Murthy added a comment -

          +1

          A straight-forward way is to use the DistributedCache directly, an easy change is to get the JobSubmissionProtocol use either a custom jar (as today) or just refer to the jars in the DistributedCache.

          Show
          Arun C Murthy added a comment - +1 A straight-forward way is to use the DistributedCache directly, an easy change is to get the JobSubmissionProtocol use either a custom jar (as today) or just refer to the jars in the DistributedCache.

            People

            • Assignee:
              Unassigned
              Reporter:
              Joydeep Sen Sarma
            • Votes:
              0 Vote for this issue
              Watchers:
              29 Start watching this issue

              Dates

              • Created:
                Updated:

                Development