Hadoop Common
  1. Hadoop Common
  2. HADOOP-2560

Processing multiple input splits per mapper task

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Currently, an input split contains a consecutive chunk of input file, which by default, corresponding to a DFS block.
      This may lead to a large number of mapper tasks if the input data is large. This leads to the following problems:

      1. Shuffling cost: since the framework has to move M * R map output segments to the nodes running reducers,
      larger M means larger shuffling cost.

      2. High JVM initialization overhead

      3. Disk fragmentation: larger number of map output files means lower read throughput for accessing them.

      Ideally, you want to keep the number of mappers to no more than 16 times the number of nodes in the cluster.
      To achive that, we can increase the input split size. However, if a split span over more than one dfs block,
      you lose the data locality scheduling benefits.

      One way to address this problem is to combine multiple input blocks with the same rack into one split.
      If in average we combine B blocks into one split, then we will reduce the number of mappers by a factor of B.
      Since all the blocks for one mapper share a rack, thus we can benefit from rack-aware scheduling.

      Thoughts?

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          > combine multiple input blocks with the same rack into one split [ ... ]

          That makes good sense to me. The new Split class could look a lot like MultiFileSplit, but would additionally support a 'getStart(int)' method. So perhaps MultiFileSplit could be extended for this purpose. FileInputFormat could be modified to emit these when the number of splits would otherwise exceed some threshold. But then all subclasses of FileInputFormat would need to be modified to be able to accept these. That wouldn't be too hard. FileInputFormat could implement getRecordReader(InputSplit) to break out the sub-splits, then call a new method, getRecordReader(FileSplit). All existing subclasses could then just change the signature of their getRecordReader implementations in order to support the new feature.

          Show
          Doug Cutting added a comment - > combine multiple input blocks with the same rack into one split [ ... ] That makes good sense to me. The new Split class could look a lot like MultiFileSplit, but would additionally support a 'getStart(int)' method. So perhaps MultiFileSplit could be extended for this purpose. FileInputFormat could be modified to emit these when the number of splits would otherwise exceed some threshold. But then all subclasses of FileInputFormat would need to be modified to be able to accept these. That wouldn't be too hard. FileInputFormat could implement getRecordReader(InputSplit) to break out the sub-splits, then call a new method, getRecordReader(FileSplit). All existing subclasses could then just change the signature of their getRecordReader implementations in order to support the new feature.
          Hide
          eric baldeschwieler added a comment -

          A) this is important because it could lead to big throughput gains and seek reductions.

          B) It is not going to work to combine splits statically because block replicas are not co-resident. This would lead to a huge performance hit due to loss of locality. I think we will need to invest in more complexity to get the desired performance improvement here.

          My gut is that we should do this dynamically in the task trackers. This would let us do it when we are seeing good io throughput. The map driver could always just request a new split after each input finishes. The TT would keep a small number of candidate splits locally and decide after each map completes a split if it is going to hand it another one. None of the public interfaces would need to change.

          We would need to change the JT quite a bit to manage maps publishing split collections, but it seems fairly straightforward. We could realize a huge performance gain on simple scanning jobs that process input quickly. We could also see good shuffle improvements.

          This would interact with speculative execution in undesirable ways... Something to watch-out for. There are a whole class of collation optimizations here. The fact that we are sorting early may make a lot of them harder... ugh.

          A related idea runping and I discussed is that if you have multiple spills in a map (combined or unchanged map), there is no point collating the spills if the reduce partitions are relatively large (say 1MB). We could just make each spill an output to the reduces. Even if they are small it would be more efficient to collate in larger units than within a single map, but that starts really broadening the design space...

          Could static splits combinations work at all? Yes I think they might if we produced only a small number and executed them early, but this would reduce the possible gain we could get.

          Show
          eric baldeschwieler added a comment - A) this is important because it could lead to big throughput gains and seek reductions. B) It is not going to work to combine splits statically because block replicas are not co-resident. This would lead to a huge performance hit due to loss of locality. I think we will need to invest in more complexity to get the desired performance improvement here. My gut is that we should do this dynamically in the task trackers. This would let us do it when we are seeing good io throughput. The map driver could always just request a new split after each input finishes. The TT would keep a small number of candidate splits locally and decide after each map completes a split if it is going to hand it another one. None of the public interfaces would need to change. We would need to change the JT quite a bit to manage maps publishing split collections, but it seems fairly straightforward. We could realize a huge performance gain on simple scanning jobs that process input quickly. We could also see good shuffle improvements. This would interact with speculative execution in undesirable ways... Something to watch-out for. There are a whole class of collation optimizations here. The fact that we are sorting early may make a lot of them harder... ugh. A related idea runping and I discussed is that if you have multiple spills in a map (combined or unchanged map), there is no point collating the spills if the reduce partitions are relatively large (say 1MB). We could just make each spill an output to the reduces. Even if they are small it would be more efficient to collate in larger units than within a single map, but that starts really broadening the design space... Could static splits combinations work at all? Yes I think they might if we produced only a small number and executed them early, but this would reduce the possible gain we could get.
          Hide
          Doug Cutting added a comment -

          > It is not going to work to combine splits statically because block replicas are not co-resident.

          If the number of blocks in the job input is hugely greater than the number of nodes, then it should be easy to find nodes that have a large number of blocks locally, and group the blocks thusly into tasks. If a task fails, then the re-execution might not be local, but most tasks don't fail, and we can arrange things so that the first node a task is assigned to has all its blocks. Or am i missing something?

          Consider the following algorithm:

          • build <node, block*> and <block, node*> maps for the job input files
          • N is the desired blocks/task
          • for (node : nodes) pop N blocks off each nodes list and add it to the list of tasks
          • as each block is popped, also remove it from all other node's lists, using the other map to accelerate this
          • repeat until nodes have fewer than N blocks, then emit tasks with fewer than N blocks as the tail of the job

          Wouldn't that work?

          Show
          Doug Cutting added a comment - > It is not going to work to combine splits statically because block replicas are not co-resident. If the number of blocks in the job input is hugely greater than the number of nodes, then it should be easy to find nodes that have a large number of blocks locally, and group the blocks thusly into tasks. If a task fails, then the re-execution might not be local, but most tasks don't fail, and we can arrange things so that the first node a task is assigned to has all its blocks. Or am i missing something? Consider the following algorithm: build <node, block*> and <block, node*> maps for the job input files N is the desired blocks/task for (node : nodes) pop N blocks off each nodes list and add it to the list of tasks as each block is popped, also remove it from all other node's lists, using the other map to accelerate this repeat until nodes have fewer than N blocks, then emit tasks with fewer than N blocks as the tail of the job Wouldn't that work?
          Hide
          Devaraj Das added a comment -

          Good one!
          We avoid a lot of diskIO/seeks in the final merge of spills on the map side. This will be benefecial for cases where the partitions can fit in the ramfs on the reduces. We get merge for almost free then.

          Show
          Devaraj Das added a comment - Good one! We avoid a lot of diskIO/seeks in the final merge of spills on the map side. This will be benefecial for cases where the partitions can fit in the ramfs on the reduces. We get merge for almost free then.
          Hide
          eric baldeschwieler added a comment -

          Nodes operate at different rates. Failures happen. In the face of several jobs running, some nodes may not even become available in a timely manner. I think a static approach will not allow both the performance gains desired and preservation of reasonable throughput.

          The current system takes full advantage of mapping jobs to nodes dynamically. A static combination of splits will break all of this. One could perhaps do something like what you suggest dynamically in the JT when a TT requests a new job. This might be a good compromise implementation. This would also let you observe some global statistics on speed of maps & size of outputs which would let you optimize cluster sizes. Of course doing this all dynamically on the TTs might use fewer JT resources.

          Show
          eric baldeschwieler added a comment - Nodes operate at different rates. Failures happen. In the face of several jobs running, some nodes may not even become available in a timely manner. I think a static approach will not allow both the performance gains desired and preservation of reasonable throughput. The current system takes full advantage of mapping jobs to nodes dynamically. A static combination of splits will break all of this. One could perhaps do something like what you suggest dynamically in the JT when a TT requests a new job. This might be a good compromise implementation. This would also let you observe some global statistics on speed of maps & size of outputs which would let you optimize cluster sizes. Of course doing this all dynamically on the TTs might use fewer JT resources.
          Hide
          Doug Cutting added a comment -

          > The current system takes full advantage of mapping jobs to nodes dynamically.

          Currently we compute and cache the mapping once per job, and then base all subsequent decisions on that cache. We get ~99% job locality with that 'static' information. Things should be about about the same if we group things, unless I'm missing something.

          > One could perhaps do something like what you suggest dynamically in the JT when a TT requests a new job.

          That's a possible enhancement. I'm not sure it's required for good localization, and it would add significant load to the namenode.

          Show
          Doug Cutting added a comment - > The current system takes full advantage of mapping jobs to nodes dynamically. Currently we compute and cache the mapping once per job, and then base all subsequent decisions on that cache. We get ~99% job locality with that 'static' information. Things should be about about the same if we group things, unless I'm missing something. > One could perhaps do something like what you suggest dynamically in the JT when a TT requests a new job. That's a possible enhancement. I'm not sure it's required for good localization, and it would add significant load to the namenode.
          Hide
          eric baldeschwieler added a comment -

          We queue each map for each candidate node (and now presumably rack) and pull them from consideration once they are scheduled on any node.

          This gets much more complicated with map sets, since you will need to tag which maps in one set have been executed somewhere else and then replace them... Much simpler to make the late binding decision to bundle them.

          I get a feeling this issue will be revisit more than once...

          Show
          eric baldeschwieler added a comment - We queue each map for each candidate node (and now presumably rack) and pull them from consideration once they are scheduled on any node. This gets much more complicated with map sets, since you will need to tag which maps in one set have been executed somewhere else and then replace them... Much simpler to make the late binding decision to bundle them. I get a feeling this issue will be revisit more than once...
          Hide
          Doug Cutting added a comment -

          > Much simpler to make the late binding decision to bundle them.

          The algorithm I outlined above could be done incrementally, rather than all up-front:

          • N is the desired splits/task
          • build <node, split*> map for the job inputs
          • when a node asks for a task, pop up to N splits off its list to form a task
          • if a node has no more splits, pop splits from other nodes
          • as each split is popped, remove it from other map entries

          This is essentially the existing algorithm, except that we allocate more than one split per task. In fact, the existing algorithm handles lots of other subtle cases like speculative execution, task failure, etc. So the best way to implement this is probably to use the existing algorithm multiple times per task, etc.

          Earlier I'd spoke of implementing this up front, when constructing splits. But if it's done this way, then we needn't actually change public APIs or InputFormats. Tasks could simply internally be changed to execute a list of splits rather than a single split.

          Show
          Doug Cutting added a comment - > Much simpler to make the late binding decision to bundle them. The algorithm I outlined above could be done incrementally, rather than all up-front: N is the desired splits/task build <node, split*> map for the job inputs when a node asks for a task, pop up to N splits off its list to form a task if a node has no more splits, pop splits from other nodes as each split is popped, remove it from other map entries This is essentially the existing algorithm, except that we allocate more than one split per task. In fact, the existing algorithm handles lots of other subtle cases like speculative execution, task failure, etc. So the best way to implement this is probably to use the existing algorithm multiple times per task, etc. Earlier I'd spoke of implementing this up front, when constructing splits. But if it's done this way, then we needn't actually change public APIs or InputFormats. Tasks could simply internally be changed to execute a list of splits rather than a single split.
          Hide
          Owen O'Malley added a comment -

          I like that approach, Doug. We should also have a entry for splits that are local to no nodes in the map/reduce cluster and prefer to steal from them rather than other nodes. This would solve HADOOP-2014...

          Show
          Owen O'Malley added a comment - I like that approach, Doug. We should also have a entry for splits that are local to no nodes in the map/reduce cluster and prefer to steal from them rather than other nodes. This would solve HADOOP-2014 ...
          Hide
          Runping Qi added a comment -

          I like the algorithm Doug outlined above.
          A few thoughts for refinement:

          Processing N input splits per mapper task may result in map output spills, depending on the value of N, the split sizes, the value of io.sort.mb, and the nature of the map function.
          Thus, N should be configured by the user on a per job basis. The default should be 1.
          N should be chosen in such a way that the mapper tasks processing N splits will not result in spills most time.

          The actual number of splits a particular mapper task will take should vary, depending on the number of splits to be processed.
          When the number of splits to be processed is low, the number of splits to be processed by the next mapper task should be reduced, so that other tasks may
          process the splits in parallel.

          All the splits processed by the same mapper task should share the same rackid so that rack locality should be maintained.

          Show
          Runping Qi added a comment - I like the algorithm Doug outlined above. A few thoughts for refinement: Processing N input splits per mapper task may result in map output spills, depending on the value of N, the split sizes, the value of io.sort.mb, and the nature of the map function. Thus, N should be configured by the user on a per job basis. The default should be 1. N should be chosen in such a way that the mapper tasks processing N splits will not result in spills most time. The actual number of splits a particular mapper task will take should vary, depending on the number of splits to be processed. When the number of splits to be processed is low, the number of splits to be processed by the next mapper task should be reduced, so that other tasks may process the splits in parallel. All the splits processed by the same mapper task should share the same rackid so that rack locality should be maintained.
          Hide
          Runping Qi added a comment -

          If we implement this Jira, I think the issue of reusing JVM outlined in HADOOP-249 becomes less urgent.

          Show
          Runping Qi added a comment - If we implement this Jira, I think the issue of reusing JVM outlined in HADOOP-249 becomes less urgent.
          Hide
          Owen O'Malley added a comment -

          I'd propose:

          • when a mapper finishes, if the job still has >1 wave of maps left, the JT assigns a new map to join on the
            previous one. You don't want to make the tail longer.
          • Limit grouping to 5 or so. You want to bound the amount of grouping so that the costs of failure aren't too high.
          • any task that has previously been killed or failed should not be grouped
          • no task that is currently being grouped should be speculated on
          • we can consider having the map task ask for a follow on map if it still has buffer capacity left. this is probably not aggressive enough because even with a merge step multiple mappers are faster if you include the shuffle time.
          Show
          Owen O'Malley added a comment - I'd propose: when a mapper finishes, if the job still has >1 wave of maps left, the JT assigns a new map to join on the previous one. You don't want to make the tail longer. Limit grouping to 5 or so. You want to bound the amount of grouping so that the costs of failure aren't too high. any task that has previously been killed or failed should not be grouped no task that is currently being grouped should be speculated on we can consider having the map task ask for a follow on map if it still has buffer capacity left. this is probably not aggressive enough because even with a merge step multiple mappers are faster if you include the shuffle time.
          Hide
          eric baldeschwieler added a comment -

          A few observations:

          1) You really, really don't want the user configuring the number of tasks to combine. The system is hard enough for users to understand without asking them to do this! Also, if the default is 1, this will effectively not be used.

          2) If you don't allow speculation on combined tasks, then you can really wedge the job, I don't think this restriction is a good idea.

          3) You don't want the node to stall going to the JT for a new task when a task is complete. This will defeat a lot of the value of this optimization. Perhaps the TT can request one more task than it has slots, in the hope that it can be assigned when the next task finishes?

          4) This is all about collation after map tasks. I think we need to start thinking of that distinctly from maps themselves. How can we make this work if we spill on average once per 2.5 maps? Or 4 times for every 3 maps? Right now the 4:3 case requires 8 spills, 4 collations and 4 fetches per reducer. What if we could turn it into 3 spills, one collation and 1 fetch per reducer? That would be a big saving.

          Show
          eric baldeschwieler added a comment - A few observations: 1) You really, really don't want the user configuring the number of tasks to combine. The system is hard enough for users to understand without asking them to do this! Also, if the default is 1, this will effectively not be used. 2) If you don't allow speculation on combined tasks, then you can really wedge the job, I don't think this restriction is a good idea. 3) You don't want the node to stall going to the JT for a new task when a task is complete. This will defeat a lot of the value of this optimization. Perhaps the TT can request one more task than it has slots, in the hope that it can be assigned when the next task finishes? 4) This is all about collation after map tasks. I think we need to start thinking of that distinctly from maps themselves. How can we make this work if we spill on average once per 2.5 maps? Or 4 times for every 3 maps? Right now the 4:3 case requires 8 spills, 4 collations and 4 fetches per reducer. What if we could turn it into 3 spills, one collation and 1 fetch per reducer? That would be a big saving.
          Hide
          dhruba borthakur added a comment -

          The resources needed in the Map phase typically depends on the amount of data it processes. Instead of specifying a grouping value of N, can we say that grouping will occur based on the size of the input data? The JT can then group as many mappers as needed to make the total input data be close to the specified value.

          Show
          dhruba borthakur added a comment - The resources needed in the Map phase typically depends on the amount of data it processes. Instead of specifying a grouping value of N, can we say that grouping will occur based on the size of the input data? The JT can then group as many mappers as needed to make the total input data be close to the specified value.
          Hide
          eric baldeschwieler added a comment -

          It would be great if we could just do that lazily...

          Show
          eric baldeschwieler added a comment - It would be great if we could just do that lazily...
          Hide
          Runping Qi added a comment -

          Some more thought on this issue.
          I think it may be more effective and even simpler to think about combining splits based on their rack locality.
          In this sense, this jira is related to H-3293

          Show
          Runping Qi added a comment - Some more thought on this issue. I think it may be more effective and even simpler to think about combining splits based on their rack locality. In this sense, this jira is related to H-3293
          Hide
          dhruba borthakur added a comment -

          Here is a sample patch that combines N splits to be executed by the same mapper instance. The logic is like this: The Normal logic is used to find the first split to be assigned to a mapper. Then if there exists other splits that are rack-local to the TT, then N-1 of those are selected and assigned to the same mapper. N is configurable per job.

          This code still needs more testing, I am posting it early just as a reference.

          Show
          dhruba borthakur added a comment - Here is a sample patch that combines N splits to be executed by the same mapper instance. The logic is like this: The Normal logic is used to find the first split to be assigned to a mapper. Then if there exists other splits that are rack-local to the TT, then N-1 of those are selected and assigned to the same mapper. N is configurable per job. This code still needs more testing, I am posting it early just as a reference.
          Hide
          dhruba borthakur added a comment -

          The problem that I am trying to solve is a job that processes a large number of input files. Each of these files are small. In the existing implementation, each mapper gets to process one split and each split is very small in size (limited by the size of the input file). I will investigate the implementation of MultipleInput.java to see if I can achieve mappers to processes larger splits.

          Show
          dhruba borthakur added a comment - The problem that I am trying to solve is a job that processes a large number of input files. Each of these files are small. In the existing implementation, each mapper gets to process one split and each split is very small in size (limited by the size of the input file). I will investigate the implementation of MultipleInput.java to see if I can achieve mappers to processes larger splits.
          Hide
          Matei Zaharia added a comment -

          Instead of using a limit N on the number of splits, I'd have a limit on the number of MB of input data, call it L. Then a default value can be set for the cluster as a whole and it will do something reasonable in each job. If some jobs have really lightweight mappers they can also choose to increase it.

          I would first look to see if there are enough data-local input chunks to make L MB of total data. If not, put all of the data-local ones go on to rack-local ones. This is strictly better than picking just random rack-local splits.

          Show
          Matei Zaharia added a comment - Instead of using a limit N on the number of splits, I'd have a limit on the number of MB of input data, call it L. Then a default value can be set for the cluster as a whole and it will do something reasonable in each job. If some jobs have really lightweight mappers they can also choose to increase it. I would first look to see if there are enough data-local input chunks to make L MB of total data. If not, put all of the data-local ones go on to rack-local ones. This is strictly better than picking just random rack-local splits.
          Hide
          eric baldeschwieler added a comment -

          be careful of the starvation issues we've seen before! You may dish out too much data to your first N nodes and distribute your input asymmetrically. What about just keeping the JVM up and passing in more data as each "task" completes?

          Show
          eric baldeschwieler added a comment - be careful of the starvation issues we've seen before! You may dish out too much data to your first N nodes and distribute your input asymmetrically. What about just keeping the JVM up and passing in more data as each "task" completes?
          Hide
          dhruba borthakur added a comment -

          Hi Eric, You are right, this could have unwelcome performance characteristics in certain cases. We will have to measure this first.
          I am actually more inclined to solve my problem using HADOOP-4565.

          Show
          dhruba borthakur added a comment - Hi Eric, You are right, this could have unwelcome performance characteristics in certain cases. We will have to measure this first. I am actually more inclined to solve my problem using HADOOP-4565 .

            People

            • Assignee:
              dhruba borthakur
              Reporter:
              Runping Qi
            • Votes:
              0 Vote for this issue
              Watchers:
              26 Start watching this issue

              Dates

              • Created:
                Updated:

                Development