Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-989

Allow segregation of DistributedCache for maps and reduces

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: distributed-cache
    • Labels:
      None

      Description

      Applications might have differing needs for files in the DistributedCache wrt maps and reduces. We should allow them to specify them separately.

        Issue Links

          Activity

          Hide
          eric baldeschwieler added a comment -

          Seems like that could be addressed with a careful design. Say an object is returned immediately while the download happens in the background. not insisting that we do that here.

          I'd just like to see a trend of making map-reduce config simpler rather than harder.

          Show
          eric baldeschwieler added a comment - Seems like that could be addressed with a careful design. Say an object is returned immediately while the download happens in the background. not insisting that we do that here. I'd just like to see a trend of making map-reduce config simpler rather than harder.
          Hide
          Milind Bhandarkar added a comment -

          If as eric suggests, the tasks themselves request the cached files needed (presumably in the configure method of the user-supplied mapper / reducer), then we lose an opportunity of overlapping populating cache for reducers with fetching map outputs.

          My request for different configuration variables for map and reduce tasks for cache is consistent with the basic observation that map and reduce runtime requirements are different. This observation has resulted in several additions to configuration variables lately, such as specifying different child.java.opts, specifying different ulimits, specifying different task runners etc for these two types of tasks. So, it is imperative that users provide different cache files and archives for different tasks too.

          This cannot be in the user-provided code, because otherwise, hadoop streaming, and pipes, and pig will have to be modified to implement that functionality in the wrappers they provide. Having one implementation provided by the framework seems to me the best way to go.

          Show
          Milind Bhandarkar added a comment - If as eric suggests, the tasks themselves request the cached files needed (presumably in the configure method of the user-supplied mapper / reducer), then we lose an opportunity of overlapping populating cache for reducers with fetching map outputs. My request for different configuration variables for map and reduce tasks for cache is consistent with the basic observation that map and reduce runtime requirements are different. This observation has resulted in several additions to configuration variables lately, such as specifying different child.java.opts, specifying different ulimits, specifying different task runners etc for these two types of tasks. So, it is imperative that users provide different cache files and archives for different tasks too. This cannot be in the user-provided code, because otherwise, hadoop streaming, and pipes, and pig will have to be modified to implement that functionality in the wrappers they provide. Having one implementation provided by the framework seems to me the best way to go.
          Hide
          eric baldeschwieler added a comment -

          Philip's idea seems interesting.

          In terms of specifying map vs reduce, would it just be possible to lazily load an item when it is needed? Asking users to configure more stuff seems awkward.

          In a previous system, we had a call to get the path to a cached object. If the object was not in the cache, it was downloaded upon the first request. This would allow objects to be used whenever needed without configuration.

          Note: This would even allow one to shard cached objects into sets, if for example different reducers need different data, which is often the case.

          Downsides:

          • Perhaps an API change?
          • Less info that the JT has for later optimizations
          Show
          eric baldeschwieler added a comment - Philip's idea seems interesting. In terms of specifying map vs reduce, would it just be possible to lazily load an item when it is needed? Asking users to configure more stuff seems awkward. In a previous system, we had a call to get the path to a cached object. If the object was not in the cache, it was downloaded upon the first request. This would allow objects to be used whenever needed without configuration. Note: This would even allow one to shard cached objects into sets, if for example different reducers need different data, which is often the case. Downsides: Perhaps an API change? Less info that the JT has for later optimizations
          Hide
          Philip Zeyliger added a comment -

          The use cases definitely make sense. Unpacking archives on setup tasks is often
          going to be pointless.

          I've been thinking about what a reasonable API for this would be (especially after working on MAPREDUCE-476), from the Job
          submitter's role. One thought is:

          addCacheFile(URI path, Set<TaskType> tasks, Set<DistributedCacheOptions> options);

          Where the default for tasks is an ImmutableSet(EnumSet<TaskType>) containing
          MAP and REDUCE. DistributedCacheOptions include

             ADD_TO_CLASSPATH
             UNARCHIVE
             CREATE_SYMLINK
          

          The defaults are to not add to classpath, not unarchive, and not create the symlink.
          (Note that we'd be creating symlinks per-file, instead of globally, which is the only
          place to set the option currently.)

          What I like about this is that it replaces 5 methods (addCacheFile,
          addCacheArchive, addFileToClassPath, addArchiveToClassPath, createSymlink),
          with one method, and doesn't loose much in the way of readability.

          You could also use booleans or enums (boolean add_to_classpath, boolean
          unarchive, boolean create_symlink), but that is often difficult to read.

          On the back-end, you'd need to revisit how the files to be cached are stored.
          The current scheme of using

          mapred.cache.archives.timestamps
          mapred.cache.localFiles
          mapred.job.classpath.files
          mapred.job.classpath.archives
          mapred.cache.archives
          mapred.cache.files
          mapred.create.symlink
          

          probably needs to remain for backwards compatibility, but it would
          be great to just stick that into one configuration property:

          mapred.filecache = [ { "path": ..., "tasks": [ MAP, REDUCE ], ... }, ... ]

          or, if it's legal

            mapred.filecache.0 = { "path: ...", ... }
            mapred.filecache.1 = ...
            ...
          

          Thoughts?

          Show
          Philip Zeyliger added a comment - The use cases definitely make sense. Unpacking archives on setup tasks is often going to be pointless. I've been thinking about what a reasonable API for this would be (especially after working on MAPREDUCE-476 ), from the Job submitter's role. One thought is: addCacheFile(URI path, Set<TaskType> tasks, Set<DistributedCacheOptions> options); Where the default for tasks is an ImmutableSet(EnumSet<TaskType>) containing MAP and REDUCE. DistributedCacheOptions include ADD_TO_CLASSPATH UNARCHIVE CREATE_SYMLINK The defaults are to not add to classpath, not unarchive, and not create the symlink. (Note that we'd be creating symlinks per-file, instead of globally, which is the only place to set the option currently.) What I like about this is that it replaces 5 methods (addCacheFile, addCacheArchive, addFileToClassPath, addArchiveToClassPath, createSymlink), with one method, and doesn't loose much in the way of readability. You could also use booleans or enums (boolean add_to_classpath, boolean unarchive, boolean create_symlink), but that is often difficult to read. On the back-end, you'd need to revisit how the files to be cached are stored. The current scheme of using mapred.cache.archives.timestamps mapred.cache.localFiles mapred.job.classpath.files mapred.job.classpath.archives mapred.cache.archives mapred.cache.files mapred.create.symlink probably needs to remain for backwards compatibility, but it would be great to just stick that into one configuration property: mapred.filecache = [ { "path": ..., "tasks": [ MAP, REDUCE ], ... }, ... ] or, if it's legal mapred.filecache.0 = { "path: ..." , ... } mapred.filecache.1 = ... ... Thoughts?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Talked with Milind offline who explained me the use case. So, the segregation is actually intended for optimizing run-times of tasks by not downloading cache files that are not needed by them.

          For e.g., setup tasks don't need dist-cache files at all and so will run faster if they don't download files intended for maps/reduces. Also for jobs which need dist-cache files only for reduces, the maps, which may be much larger in number than reduces, will run faster and the overall job-execution time will improve.

          Show
          Vinod Kumar Vavilapalli added a comment - Talked with Milind offline who explained me the use case. So, the segregation is actually intended for optimizing run-times of tasks by not downloading cache files that are not needed by them. For e.g., setup tasks don't need dist-cache files at all and so will run faster if they don't download files intended for maps/reduces. Also for jobs which need dist-cache files only for reduces, the maps, which may be much larger in number than reduces, will run faster and the overall job-execution time will improve.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Any use-cases? As of now, maps and reduces can manage separate DistributedCache files/archives themselves. Is the suggestion only for separate API in job submission? Or do we want to enfoce the segragation on the TTs too - avoiding maps' cache files to be accessible by reduces and vice versa?

          Show
          Vinod Kumar Vavilapalli added a comment - Any use-cases? As of now, maps and reduces can manage separate DistributedCache files/archives themselves. Is the suggestion only for separate API in job submission? Or do we want to enfoce the segragation on the TTs too - avoiding maps' cache files to be accessible by reduces and vice versa?

            People

            • Assignee:
              Unassigned
              Reporter:
              Arun C Murthy
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:

                Development