Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: applicationmaster, mrv2
    • Labels:
      None

      Description

      Instead of computing the input splits as part of job submission, Hadoop could have a separate "job task type" that computes the input splits, therefore allowing that computation to happen on the cluster.

        Issue Links

          Activity

          Hide
          Philip Zeyliger added a comment -

          The motivation behind computing the input splits on the cluster is at least two-fold:

          • It would be great to be able to submit jobs to a cluster using a simple (REST?) API, from many languages. (Similar to HADOOP-5633.) The fact that job submission does a bunch of mapreduce-internal work makes such submission very tricky. We're already seeing how workflow systems (here I'm thinking of Oozie and Pig) run MR jobs simply to launch more MR jobs, while inheriting the scheduling and isolation work that the JobTracker already does.
          • Sometimes computing the input splits is, in of itself, an operation that would do well to be run in parallel across several machines. For example, splitting inputs may require going through many files on the DFS. Moving input split calculations onto the cluster would pave the way for this to be possible.

          Implementation-wise, we already have JOB_SETUP and JOB_CLEANUP tasks, so adding a JOB_SPLIT_CALCULATION, which could be colocated with JOB_SETUP makes some sense.

          Show
          Philip Zeyliger added a comment - The motivation behind computing the input splits on the cluster is at least two-fold: It would be great to be able to submit jobs to a cluster using a simple (REST?) API, from many languages. (Similar to HADOOP-5633 .) The fact that job submission does a bunch of mapreduce-internal work makes such submission very tricky. We're already seeing how workflow systems (here I'm thinking of Oozie and Pig) run MR jobs simply to launch more MR jobs, while inheriting the scheduling and isolation work that the JobTracker already does. Sometimes computing the input splits is, in of itself, an operation that would do well to be run in parallel across several machines. For example, splitting inputs may require going through many files on the DFS. Moving input split calculations onto the cluster would pave the way for this to be possible. Implementation-wise, we already have JOB_SETUP and JOB_CLEANUP tasks, so adding a JOB_SPLIT_CALCULATION, which could be colocated with JOB_SETUP makes some sense.
          Hide
          Hemanth Yamijala added a comment -

          Before we do this, I think we should resolve HADOOP-4421. Atleast to the extent of agreeing to a design. Adding one more task, while we are trying to fix problems with the existing ones might make things a tad more difficult to manage.

          Show
          Hemanth Yamijala added a comment - Before we do this, I think we should resolve HADOOP-4421 . Atleast to the extent of agreeing to a design. Adding one more task, while we are trying to fix problems with the existing ones might make things a tad more difficult to manage.
          Hide
          Devaraj Das added a comment -

          Isn't it possible to do this as part of the JOB_SETUP task itself?

          Show
          Devaraj Das added a comment - Isn't it possible to do this as part of the JOB_SETUP task itself?
          Hide
          Amareshwari Sriramadasu added a comment -

          Isn't it possible to do this as part of the JOB_SETUP task itself?

          This can be done. We should move out the creation of setup/cleanup tasks from JobInProgress.initTasks().

          Show
          Amareshwari Sriramadasu added a comment - Isn't it possible to do this as part of the JOB_SETUP task itself? This can be done. We should move out the creation of setup/cleanup tasks from JobInProgress.initTasks().
          Hide
          Amareshwari Sriramadasu added a comment -

          This can be done. We should move out the creation of setup/cleanup tasks from JobInProgress.initTasks().

          Related jira HADOOP-4472.

          Show
          Amareshwari Sriramadasu added a comment - This can be done. We should move out the creation of setup/cleanup tasks from JobInProgress.initTasks(). Related jira HADOOP-4472 .
          Hide
          Owen O'Malley added a comment -

          This patch should reintroduce checkInputSplits into org.apache.hadoop.mapreduce.InputFormat. This method should be documented as optional. It will only be invoked if Java code is doing the submission to detect errors in the user's job configuration, such as missing or read-protected input directory, before the job is submitted to the cluster.

          Show
          Owen O'Malley added a comment - This patch should reintroduce checkInputSplits into org.apache.hadoop.mapreduce.InputFormat. This method should be documented as optional . It will only be invoked if Java code is doing the submission to detect errors in the user's job configuration, such as missing or read-protected input directory, before the job is submitted to the cluster.
          Hide
          Philip Zeyliger added a comment -

          I've been poking around here and am running into a fair amount of friction with how different task types are managed.

          As far as I can tell, there are several ways that different task types are distinguished:

          • There's a TaskType enum, which contains MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, and TASK_CLEANUP. This is used quite a bit.
          • TaskInProgress has isMapTask(), isJobCleanupTask(), isJobSetupTask(). I believe that TIP can report both isMapTask() and isJobCleanupTask() on the same object and that reduces are implied by !isMapTask().
          • Task uses a hybrid approach. There's MapTask and ReduceTask (a class hierarchy), but there's also isMapTask(), isJobSetupTask(), isTaskCleanupTask(), and isJobCleanuptask().
          • Schedulers and TaskTrackers for the most part only deal with MAP and REDUCE tasks. Really, these are "slot types", since other types of tasks can be run in them. Schedulers are not aware of the "special tasks"---the JobTracker schedules them "manually" on its own.

          Does this sound about right?

          – Philip

          Show
          Philip Zeyliger added a comment - I've been poking around here and am running into a fair amount of friction with how different task types are managed. As far as I can tell, there are several ways that different task types are distinguished: There's a TaskType enum, which contains MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, and TASK_CLEANUP. This is used quite a bit. TaskInProgress has isMapTask(), isJobCleanupTask(), isJobSetupTask(). I believe that TIP can report both isMapTask() and isJobCleanupTask() on the same object and that reduces are implied by !isMapTask(). Task uses a hybrid approach. There's MapTask and ReduceTask (a class hierarchy), but there's also isMapTask(), isJobSetupTask(), isTaskCleanupTask(), and isJobCleanuptask(). Schedulers and TaskTrackers for the most part only deal with MAP and REDUCE tasks. Really, these are "slot types", since other types of tasks can be run in them. Schedulers are not aware of the "special tasks"---the JobTracker schedules them "manually" on its own. Does this sound about right? – Philip
          Hide
          Matei Zaharia added a comment -

          I think that's almost right, Philip. It looks to me like TASK_CLEANUP tasks can be both maps and reduces. The JobTracker will launch them in a reduce slot if they are cleaning up after a reducer. Therefore, isMapTask() might return false when the task is a cleanup task. To check whether a given Task is a plain old map task or plain old reduce task, you can use Task.isMapOrReduce().

          This part of the code definitely leaves something to be desired. I believe Arun mentioned he'd look at it as part of JobTracker refactoring in the future.

          Show
          Matei Zaharia added a comment - I think that's almost right, Philip. It looks to me like TASK_CLEANUP tasks can be both maps and reduces. The JobTracker will launch them in a reduce slot if they are cleaning up after a reducer. Therefore, isMapTask() might return false when the task is a cleanup task. To check whether a given Task is a plain old map task or plain old reduce task, you can use Task.isMapOrReduce(). This part of the code definitely leaves something to be desired. I believe Arun mentioned he'd look at it as part of JobTracker refactoring in the future.
          Hide
          Arun C Murthy added a comment -

          This is fairly trivial in MRv2, I'll take a crack at this.

          Show
          Arun C Murthy added a comment - This is fairly trivial in MRv2, I'll take a crack at this.
          Hide
          Arun C Murthy added a comment -

          As foretold, here is a trivial, preliminary patch to move computation of input-splits inside the cluster - something we've craved for a very long time, as evinced by the interest in this jira and the number of times it comes up on user lists.

          This is huge, because it's a significant step towards various improvements such as HTTP-based job submission etc.

          Shameless plug for MRv2 - it took me 15 mins on a Sunday night to get this done... glory to MRv2! smile


          It needs a tad more work to get delegation tokens on the client side, but it's nearly there.

          Show
          Arun C Murthy added a comment - As foretold, here is a trivial, preliminary patch to move computation of input-splits inside the cluster - something we've craved for a very long time, as evinced by the interest in this jira and the number of times it comes up on user lists. This is huge, because it's a significant step towards various improvements such as HTTP-based job submission etc. Shameless plug for MRv2 - it took me 15 mins on a Sunday night to get this done... glory to MRv2! smile It needs a tad more work to get delegation tokens on the client side, but it's nearly there.
          Hide
          Johannes Zillmann added a comment -

          Currently in our hadoop applications we calculate the splits before we submit it to the client (then the client simply looks up the existing splits). We do that mainly to influence the reducer count base on the number of splits/map-tasks.
          In case hadoop does the splitting on the cluster (which makes sense), it would be nice to have a hook to influence configuration!
          Sometimes it also makes sense for us to decide on the map-reduce assembly after we know the splits (different join strategies for different data constellations).

          Just dumping some ideas here...

          Show
          Johannes Zillmann added a comment - Currently in our hadoop applications we calculate the splits before we submit it to the client (then the client simply looks up the existing splits). We do that mainly to influence the reducer count base on the number of splits/map-tasks. In case hadoop does the splitting on the cluster (which makes sense), it would be nice to have a hook to influence configuration! Sometimes it also makes sense for us to decide on the map-reduce assembly after we know the splits (different join strategies for different data constellations). Just dumping some ideas here...
          Hide
          Sandy Ryza added a comment -

          Arun, are you still planning on working on this? If not, do you mind if I pick it up?

          Show
          Sandy Ryza added a comment - Arun, are you still planning on working on this? If not, do you mind if I pick it up?

            People

            • Assignee:
              Arun C Murthy
              Reporter:
              Philip Zeyliger
            • Votes:
              1 Vote for this issue
              Watchers:
              33 Start watching this issue

              Dates

              • Created:
                Updated:

                Development