Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-240

Improve the shuffle phase by using the "connection: keep-alive" and doing batch transfers of files

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We should do transfers of map outputs at the granularity of total-bytes-transferred rather than the current way of transferring a single file and then closing the connection to the server. A single TaskTracker might have a couple of map output files for a given reduce, and we should transfer multiple of them (upto a certain total size) in a single connection to the TaskTracker. Using HTTP-1.1's keep-alive connection would help since it would keep the connection open for more than one file transfer. We should limit the transfers to a certain size so that we don't hold up a jetty thread indefinitely (and cause timeouts for other clients).
      Overall, this should give us improved performance.

      1. hadoop-1338-v2.patch
        65 kB
        Jothi Padmanabhan
      2. hadoop-1338-v1.patch
        58 kB
        Jothi Padmanabhan

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          As a performance-enhancement, any patch for this must demonstrate a significant performance improvement before it can be committed. I suspect that simply caching a fixed number of connections will not provide a significant performance enhancement. Rather, we might attempt, when contacting a node, to transfer all available output that resides on that node. So, instead of randomly shuffling the map output locations, a reduce task might first group locations by host, then randomly shuffle, fetching batches from each host. However, if the shuffle is already keeping up with the map, even this may not improve things much, since each map node may tend to only have a single new output at a time.

          Show
          Doug Cutting added a comment - As a performance-enhancement, any patch for this must demonstrate a significant performance improvement before it can be committed. I suspect that simply caching a fixed number of connections will not provide a significant performance enhancement. Rather, we might attempt, when contacting a node, to transfer all available output that resides on that node. So, instead of randomly shuffling the map output locations, a reduce task might first group locations by host, then randomly shuffle, fetching batches from each host. However, if the shuffle is already keeping up with the map, even this may not improve things much, since each map node may tend to only have a single new output at a time.
          Hide
          Devaraj Das added a comment -

          Agree with you on this, Doug. This issue is investigative in nature. This might make more sense for cases where we have two or more waves of reduces...

          Show
          Devaraj Das added a comment - Agree with you on this, Doug. This issue is investigative in nature. This might make more sense for cases where we have two or more waves of reduces...
          Hide
          Devaraj Das added a comment -

          "This might make more sense for cases where we have two or more waves of reduces" - should clarify this by saying that, although the first wave of shuffles is gated by the time the Map phase takes to complete, we should check whether the subsequent waves of shuffles would gain by this optimization. I think the first phase of shuffle might also gain to some extent.

          Show
          Devaraj Das added a comment - "This might make more sense for cases where we have two or more waves of reduces" - should clarify this by saying that, although the first wave of shuffles is gated by the time the Map phase takes to complete, we should check whether the subsequent waves of shuffles would gain by this optimization. I think the first phase of shuffle might also gain to some extent.
          Hide
          Doug Cutting added a comment -

          What's the motivation for more than one wave of reduces? Typically applications are better served by fewer output files, so that can't be it. Is it to decrease the granularity of each reduce, so that, in the case of reduce failures, job latency is reduced?

          Show
          Doug Cutting added a comment - What's the motivation for more than one wave of reduces? Typically applications are better served by fewer output files, so that can't be it. Is it to decrease the granularity of each reduce, so that, in the case of reduce failures, job latency is reduced?
          Hide
          Devaraj Das added a comment -

          Yes reducing the job latency in the presence of reduce failures is the reason for having multiple waves.

          Show
          Devaraj Das added a comment - Yes reducing the job latency in the presence of reduce failures is the reason for having multiple waves.
          Hide
          Devaraj Das added a comment -
          Show
          Devaraj Das added a comment - Interesting points here .. http://www.w3.org/Protocols/HTTP/Performance/
          Hide
          Runping Qi added a comment -

          Batch transfer may not be a big deal before 0.18, because there were a lot of inefficient places in the shuffling code anyway.
          However, it will be a significant improvement for 0.18.
          Based on my observations on running gridmix with 0.18, the fatching part becomes a bottleneck of the shuffling phase in 0.18.
          I think it is definitely worth to revisit this issue.
          Based on my understanding of the shuffling code, it is not hard to implement keep alive connection in the fetcher.

          Show
          Runping Qi added a comment - Batch transfer may not be a big deal before 0.18, because there were a lot of inefficient places in the shuffling code anyway. However, it will be a significant improvement for 0.18. Based on my observations on running gridmix with 0.18, the fatching part becomes a bottleneck of the shuffling phase in 0.18. I think it is definitely worth to revisit this issue. Based on my understanding of the shuffling code, it is not hard to implement keep alive connection in the fetcher.
          Hide
          Runping Qi added a comment -

          Had an offline chat with Devaraj about this issue.
          Here is a proposal based on the chat.

          When a reducer need to fetch map outputs from a TT, it sends a request to the TT with a list of segments, not just one, it needs from the TT
          Upon receiving the request, the TT builds a response consisting of a subset of the requested map output segments, up to certain upper limit on the
          total size. TT can do so with a sngle pass of sequential reads of the map output file, instead of multiple random reads.
          This will significantly lower the number of round trips between the reducer and the TT and the number of random reads the TT needs to do for
          the jobs with large number of mappers and the with a lot of small map output segments.

          Show
          Runping Qi added a comment - Had an offline chat with Devaraj about this issue. Here is a proposal based on the chat. When a reducer need to fetch map outputs from a TT, it sends a request to the TT with a list of segments, not just one, it needs from the TT Upon receiving the request, the TT builds a response consisting of a subset of the requested map output segments, up to certain upper limit on the total size. TT can do so with a sngle pass of sequential reads of the map output file, instead of multiple random reads. This will significantly lower the number of round trips between the reducer and the TT and the number of random reads the TT needs to do for the jobs with large number of mappers and the with a lot of small map output segments.
          Hide
          Owen O'Malley added a comment -

          What is the advantage of sending multiple maps ids at once rather than using keep alive? It is much better to just use the standard http keep alive paradigm. This should be put on hold until we have upgraded to Jetty 6. I suspect that part of the problem may be the use of Java's url getter. We should compare the performance of Java URL versus Apache HttpClient.

          Show
          Owen O'Malley added a comment - What is the advantage of sending multiple maps ids at once rather than using keep alive? It is much better to just use the standard http keep alive paradigm. This should be put on hold until we have upgraded to Jetty 6. I suspect that part of the problem may be the use of Java's url getter. We should compare the performance of Java URL versus Apache HttpClient.
          Hide
          Runping Qi added a comment -

          With multiple mapper ids, the the jetty of the TT can fulfill the multiple segments request by reading the map output file sequentially.
          If you just use Keep-Alive, you cannot perform such an optimization.

          Show
          Runping Qi added a comment - With multiple mapper ids, the the jetty of the TT can fulfill the multiple segments request by reading the map output file sequentially. If you just use Keep-Alive, you cannot perform such an optimization.
          Hide
          Runping Qi added a comment -

          Never mind about my comments above.
          Jetty has to open different map output files for different mappers.

          Show
          Runping Qi added a comment - Never mind about my comments above. Jetty has to open different map output files for different mappers.
          Hide
          Matei Zaharia added a comment -

          I think pipelining requests should make sense either way because it avoids the connection latency and TCP ramp-up time for each one. Organizing the map output files so as to minimize scans is a different issue - potentially it will be done by some node-wide combiner, but we can solve it then.

          Show
          Matei Zaharia added a comment - I think pipelining requests should make sense either way because it avoids the connection latency and TCP ramp-up time for each one. Organizing the map output files so as to minimize scans is a different issue - potentially it will be done by some node-wide combiner, but we can solve it then.
          Hide
          Devaraj Das added a comment -

          Owen, I am not too sure if HTTP's keep-alive will be always useful in our scenario. The reason for this is that we fetch from a random host each time. HTTP's keep-alive would keep the connection alive for only a certain time I believe. Also, if the keep-alive timeout were configurable and we could set a higher timeout, for scalability reasons (imagine big clusters containing 1000s of nodes), we cannot have a server keep too many client connections alive at the same time...
          My gut is that we should see some benefits of pulling multiple map outputs per HTTP request. If we could use PipeLining, we would have to be careful that we don't pull too many outputs in one go since that might starve other clients. But the question is does that Java or Apache HTTPClient support PipeLining. If not, we would need to build the protocol over the regular HTTP request protocol (something along the lines Runping suggested). But seemed to me that it interferes with the inMemory shuffle in subtle ways and that part needs to be thought about.
          But yes, overall this requires good amount of benchmarking...

          Show
          Devaraj Das added a comment - Owen, I am not too sure if HTTP's keep-alive will be always useful in our scenario. The reason for this is that we fetch from a random host each time. HTTP's keep-alive would keep the connection alive for only a certain time I believe. Also, if the keep-alive timeout were configurable and we could set a higher timeout, for scalability reasons (imagine big clusters containing 1000s of nodes), we cannot have a server keep too many client connections alive at the same time... My gut is that we should see some benefits of pulling multiple map outputs per HTTP request. If we could use PipeLining, we would have to be careful that we don't pull too many outputs in one go since that might starve other clients. But the question is does that Java or Apache HTTPClient support PipeLining. If not, we would need to build the protocol over the regular HTTP request protocol (something along the lines Runping suggested). But seemed to me that it interferes with the inMemory shuffle in subtle ways and that part needs to be thought about. But yes, overall this requires good amount of benchmarking...
          Hide
          Owen O'Malley added a comment -

          Devaraj, I agree with the current random scheduling it wouldn't be useful. If on the other hand the shuffle had the events in a structure like:

          Map<hostname, List<Events>>

          it would work well. It would pick a random host and pull all of the outputs from a single host.

          Show
          Owen O'Malley added a comment - Devaraj, I agree with the current random scheduling it wouldn't be useful. If on the other hand the shuffle had the events in a structure like: Map<hostname, List<Events>> it would work well. It would pick a random host and pull all of the outputs from a single host.
          Hide
          Devaraj Das added a comment -

          Owen, that's how the structure is. However, i am slightly concerned about pulling all outputs at one go. Would it lead to lots of timeouts on other clients trying to pull from the same host and hence lead to long tails due to the backoff strategy.. But yeah certainly worth giving it a shot..

          Show
          Devaraj Das added a comment - Owen, that's how the structure is. However, i am slightly concerned about pulling all outputs at one go. Would it lead to lots of timeouts on other clients trying to pull from the same host and hence lead to long tails due to the backoff strategy.. But yeah certainly worth giving it a shot..
          Hide
          Matei Zaharia added a comment -

          I think you don't have to pull all of them. Just pull enough to fill up 100
          MB or something, and then cut the connection and service the next client. It
          will still be far more efficient than everybody pulling off 1 MB at a time.
          It's like the map input chunk grouping (HADOOP-2560) - amortize the fixed
          cost over a few transfers without making it so many that your task takes a
          very long time.

          Show
          Matei Zaharia added a comment - I think you don't have to pull all of them. Just pull enough to fill up 100 MB or something, and then cut the connection and service the next client. It will still be far more efficient than everybody pulling off 1 MB at a time. It's like the map input chunk grouping ( HADOOP-2560 ) - amortize the fixed cost over a few transfers without making it so many that your task takes a very long time.
          Hide
          Raghu Angadi added a comment -

          I think on a LAN, overhead of a establishing a TCP connection usually gets overrated. Slow start is a real problem. Just like how this could be disabled in Hadoop, Jetty might have an option.

          I would think just disabling slow-start would give more benefit than keep-alive without disabling slow-start.

          In any case, already existing benchmarks might be able to show any benefit this gives. Given possible side affects of keep-alive as Devaraj mentioned, improvement might need to be reasonably noticeable.

          Show
          Raghu Angadi added a comment - I think on a LAN, overhead of a establishing a TCP connection usually gets overrated. Slow start is a real problem. Just like how this could be disabled in Hadoop, Jetty might have an option. I would think just disabling slow-start would give more benefit than keep-alive without disabling slow-start. In any case, already existing benchmarks might be able to show any benefit this gives. Given possible side affects of keep-alive as Devaraj mentioned, improvement might need to be reasonably noticeable.
          Hide
          Devaraj Das added a comment -

          Matei, the problem with pulling large number of segments (amounting to a large total size) is that it would interfere with the inMemory shuffle thing. Note that we want to use the memory buffer for shuffle as much as possible to avoid disk IO. We probably need to base the max size we pull (in the case we are trying to pull multiple segments) on the buffer available for shuffle...

          Raghu, that's an interesting suggestion. Worth trying out.

          Show
          Devaraj Das added a comment - Matei, the problem with pulling large number of segments (amounting to a large total size) is that it would interfere with the inMemory shuffle thing. Note that we want to use the memory buffer for shuffle as much as possible to avoid disk IO. We probably need to base the max size we pull (in the case we are trying to pull multiple segments) on the buffer available for shuffle... Raghu, that's an interesting suggestion. Worth trying out.
          Hide
          Matei Zaharia added a comment -

          Ah that makes sense. Perhaps the max data fetched at a time should be
          smaller (1-10 MB), but I think there are important jobs with less than 1 KB
          of output per map, such as counting jobs or filtering jobs. Definitely for
          jobs with a lot of data, it doesn't make sense to degrade the performance of
          the shuffle.

          Show
          Matei Zaharia added a comment - Ah that makes sense. Perhaps the max data fetched at a time should be smaller (1-10 MB), but I think there are important jobs with less than 1 KB of output per map, such as counting jobs or filtering jobs. Definitely for jobs with a lot of data, it doesn't make sense to degrade the performance of the shuffle.
          Hide
          Jothi Padmanabhan added a comment -

          HTTP's keep-alive would keep the connection alive for only a certain time I believe

          I do not think there is a timeout for the duration for which the connection is kept alive – Keep alive packets are sent out after a 'keepalive' time (normally 2 hours) of idleness and if the opposite side responds, the connection is kept alive.
          However, as pointed out, it might not be a good idea to just hold on to connections for scalability reasons.

          Show
          Jothi Padmanabhan added a comment - HTTP's keep-alive would keep the connection alive for only a certain time I believe I do not think there is a timeout for the duration for which the connection is kept alive – Keep alive packets are sent out after a 'keepalive' time (normally 2 hours) of idleness and if the opposite side responds, the connection is kept alive. However, as pointed out, it might not be a good idea to just hold on to connections for scalability reasons.
          Hide
          Jothi Padmanabhan added a comment -

          Just like how this could be disabled in Hadoop

          Can we disable slow-start algorithm programatically? I thought it needs to be done at the kernel level. How do we disable it in Hadoop?

          Also, the 2 hours in my previous comment referred to TCP Keep alive and not HTTP keep alive. HTTP Keep alive timeouts are considerably smaller.

          Show
          Jothi Padmanabhan added a comment - Just like how this could be disabled in Hadoop Can we disable slow-start algorithm programatically? I thought it needs to be done at the kernel level. How do we disable it in Hadoop? Also, the 2 hours in my previous comment referred to TCP Keep alive and not HTTP keep alive. HTTP Keep alive timeouts are considerably smaller.
          Hide
          Jothi Padmanabhan added a comment -

          To see if actually pulling more data at one short is beneficial or not, we tried the following experiment. We ran the loadgen example with 50K maps, 2MB per map. We ran with num reduces = 1, 2 and 4.

          The idea here is that with increasing number of reducers, each reducer would pull in lesser amount of intermediate data to fetch from a given map.

          We found the shuffie times as
          Num Reducers = 1, 5800s
          Num Reducers = 2, 3120s
          Num Reducers = 4, 1660s

          In essence, when we reduce the amount fetched by a factor of half, the time actually did not fall by half, but was slightly more than half. Do these results actually imply that batching of maps could be beneficial? Thoughts?

          Show
          Jothi Padmanabhan added a comment - To see if actually pulling more data at one short is beneficial or not, we tried the following experiment. We ran the loadgen example with 50K maps, 2MB per map. We ran with num reduces = 1, 2 and 4. The idea here is that with increasing number of reducers, each reducer would pull in lesser amount of intermediate data to fetch from a given map. We found the shuffie times as Num Reducers = 1, 5800s Num Reducers = 2, 3120s Num Reducers = 4, 1660s In essence, when we reduce the amount fetched by a factor of half, the time actually did not fall by half, but was slightly more than half. Do these results actually imply that batching of maps could be beneficial? Thoughts?
          Hide
          Raghu Angadi added a comment -

          > Can we disable slow-start algorithm programatically?

          It can be set per socket. Socket.setTcpNoDelay().

          Show
          Raghu Angadi added a comment - > Can we disable slow-start algorithm programatically? It can be set per socket. Socket.setTcpNoDelay() .
          Hide
          Jothi Padmanabhan added a comment -

          That disables Nagle's algorithm. Though Slow start and Nagles algorithm are related, they are not the same.

          Show
          Jothi Padmanabhan added a comment - That disables Nagle's algorithm. Though Slow start and Nagles algorithm are related, they are not the same.
          Hide
          Jothi Padmanabhan added a comment -

          I did another test to see if pulling in more data per connection is beneficial. I just ran a simple client/server application on sockets. In the following table, for each transfer, a connection was created and destroyed.

          Here are the results

          A. When nodes are in the same rack

          Transfer Size (MB) Num Transfers Total Time (seconds)
          50 200 123
          100 100 109
          200 50 101
          400 25 97

          B. When the nodes were on a different rack

          Transfer Size (MB) Num Transfers Total Time (seconds)
          50 200 158
          100 100 142
          200 50 134
          400 25 130

          From these results it does appear that there could be a small but definite advantage in bunching the outputs, especially when each output is small.

          Thoughts?

          Show
          Jothi Padmanabhan added a comment - I did another test to see if pulling in more data per connection is beneficial. I just ran a simple client/server application on sockets. In the following table, for each transfer, a connection was created and destroyed. Here are the results A. When nodes are in the same rack Transfer Size (MB) Num Transfers Total Time (seconds) 50 200 123 100 100 109 200 50 101 400 25 97 B. When the nodes were on a different rack Transfer Size (MB) Num Transfers Total Time (seconds) 50 200 158 100 100 142 200 50 134 400 25 130 From these results it does appear that there could be a small but definite advantage in bunching the outputs, especially when each output is small. Thoughts?
          Hide
          Jothi Padmanabhan added a comment -

          I also tried doing the above experiment with Nagle's algorithm disabled. Got pretty similar times.

          Show
          Jothi Padmanabhan added a comment - I also tried doing the above experiment with Nagle's algorithm disabled. Got pretty similar times.
          Hide
          Owen O'Malley added a comment -

          We should copy all of the map outputs from a single host until we have nothing more to get from them. There is no advantage to dropping the connection. It just costs time. We need to shuffle all of the map outputs, so there is no need to be "fair". We just want to maximize the throughput. The best way to do that is to pull everything off of a node as fast as we can, keeping the connection alive while we do it.

          Show
          Owen O'Malley added a comment - We should copy all of the map outputs from a single host until we have nothing more to get from them. There is no advantage to dropping the connection. It just costs time. We need to shuffle all of the map outputs, so there is no need to be "fair". We just want to maximize the throughput. The best way to do that is to pull everything off of a node as fast as we can, keeping the connection alive while we do it.
          Hide
          Runping Qi added a comment -

          Agree with Owen.

          Show
          Runping Qi added a comment - Agree with Owen.
          Hide
          Devaraj Das added a comment -

          I would like to add one point to this (which is already what we do today) - i.e., stall the copier (drop the connection) when we find that a map output fits the bill for an inMemory shuffle but we don't presently have enough space in the inmemory shuffle area. In our earlier benchmarks we found that we gain a lot by stalling the copier if such a situation happens (mainly due to the costs of the disk IO).
          Also, in such a case, it might be good for the tasktracker to send available outputs in the ascending order of their sizes. That way, the reducer gets a better chance of fitting more outputs in memory before dropping the connection, if at all.

          Owen, are you suggesting that we don't do the copier stalling at all?

          Thoughts?

          Show
          Devaraj Das added a comment - I would like to add one point to this (which is already what we do today) - i.e., stall the copier (drop the connection) when we find that a map output fits the bill for an inMemory shuffle but we don't presently have enough space in the inmemory shuffle area. In our earlier benchmarks we found that we gain a lot by stalling the copier if such a situation happens (mainly due to the costs of the disk IO). Also, in such a case, it might be good for the tasktracker to send available outputs in the ascending order of their sizes. That way, the reducer gets a better chance of fitting more outputs in memory before dropping the connection, if at all. Owen, are you suggesting that we don't do the copier stalling at all? Thoughts?
          Hide
          Jothi Padmanabhan added a comment -

          Had an offline discussion with Owen and some of the points that came out are:

          • Pull several maps from a single host before moving on to the next. When we reach a stage where we need to stall, we stall (Similar to the existing logic)
          • If the map size is big that we would shuffle to disk anyway, we continue without stalling.
          • We need to come up the correct logic for deciding timeouts as it is possible that other reducers might time out connecting to a TT that is serving a particular reducer with huge map outputs (since we are pulling several at one shot now). We will have to ensure that we do not decide erroneously that maps from a busy TT are faulty.
          • It is not possible to have the Map completion events to have map length information as that would mean having one long per reducer per map. An alternate idea could be to have the size encoded using say 2 bits per reducer. Each value could indicate a range - the ranges could be for exampe 0, 1-999,1000-9999, >10000. We need to evaluate if this encoding would be useful or not and if yes, what the correct ranges should be.
          Show
          Jothi Padmanabhan added a comment - Had an offline discussion with Owen and some of the points that came out are: Pull several maps from a single host before moving on to the next. When we reach a stage where we need to stall, we stall (Similar to the existing logic) If the map size is big that we would shuffle to disk anyway, we continue without stalling. We need to come up the correct logic for deciding timeouts as it is possible that other reducers might time out connecting to a TT that is serving a particular reducer with huge map outputs (since we are pulling several at one shot now). We will have to ensure that we do not decide erroneously that maps from a busy TT are faulty. It is not possible to have the Map completion events to have map length information as that would mean having one long per reducer per map. An alternate idea could be to have the size encoded using say 2 bits per reducer. Each value could indicate a range - the ranges could be for exampe 0, 1-999,1000-9999, >10000. We need to evaluate if this encoding would be useful or not and if yes, what the correct ranges should be.
          Hide
          Jothi Padmanabhan added a comment -

          Just for information, we confirmed that Nagle's algorithm is turned off by default in the Jetty Server

          Show
          Jothi Padmanabhan added a comment - Just for information, we confirmed that Nagle's algorithm is turned off by default in the Jetty Server
          Hide
          Jothi Padmanabhan added a comment -

          I tested a patch where

          • Each copier thread is assigned one host
          • Each copier thread would pull 'n' map outputs from a given host (until a specific size threshold has been pulled), before moving on to the next thread
          • Each fetch would be one map request/response (as it exists in the trunk)

          With the above patch, I did not observe any improvement at all (for a variety of map sizes with the loadgen program). The underlying presumption with this patch was that since each thread is holding on to the host, the keep-alive would kick in (by JVM?) and make a few of the connections as no-op, as these are connections made to the same host/port. However, it looks like keep-alive is not kicking in and see similar shuffle times with and without this patch.

          We did another test where the code was hacked so that the copier fetches a configurable number of maps at a time and the the TT replies to this request by clubbing the map outputs together. The received map outputs were just discarded at the reducer (neither written to disk nor memory) so that we just measured the network performance. The following are the results

          Number of Maps Per Fetch Average Shuffle Time Worst case Shuffle Time
          1 1:27 4:20
          2 1:11 2:11
          4 47s 1:11
          8 29s 41s

          From this it does appear that we would benefit from modifying the fetch protocol to fetch several maps at one shot, using the same connection. Thoughts?

          Show
          Jothi Padmanabhan added a comment - I tested a patch where Each copier thread is assigned one host Each copier thread would pull 'n' map outputs from a given host (until a specific size threshold has been pulled), before moving on to the next thread Each fetch would be one map request/response (as it exists in the trunk) With the above patch, I did not observe any improvement at all (for a variety of map sizes with the loadgen program). The underlying presumption with this patch was that since each thread is holding on to the host, the keep-alive would kick in (by JVM?) and make a few of the connections as no-op, as these are connections made to the same host/port. However, it looks like keep-alive is not kicking in and see similar shuffle times with and without this patch. We did another test where the code was hacked so that the copier fetches a configurable number of maps at a time and the the TT replies to this request by clubbing the map outputs together. The received map outputs were just discarded at the reducer (neither written to disk nor memory) so that we just measured the network performance. The following are the results Number of Maps Per Fetch Average Shuffle Time Worst case Shuffle Time 1 1:27 4:20 2 1:11 2:11 4 47s 1:11 8 29s 41s From this it does appear that we would benefit from modifying the fetch protocol to fetch several maps at one shot, using the same connection. Thoughts?
          Hide
          Matei Zaharia added a comment -

          I think explicitly fetching multiple maps per connection is the best way to go. We can control exactly what happens instead of relying on connection keep-alive to work in the HTTP library and running into problems if this changes. In terms of how many outputs to fetch, I believe we should take the same approach as Dhruba's multiple-splits-per-map patch - don't say a fixed number of outputs, but rather a fixed number of bytes (say up to 256 MB). If a job has small map outputs, you should fetch more, and in the extreme, for something like webdatascan, you might as well fetch all the outputs on a host in one go.

          Show
          Matei Zaharia added a comment - I think explicitly fetching multiple maps per connection is the best way to go. We can control exactly what happens instead of relying on connection keep-alive to work in the HTTP library and running into problems if this changes. In terms of how many outputs to fetch, I believe we should take the same approach as Dhruba's multiple-splits-per-map patch - don't say a fixed number of outputs, but rather a fixed number of bytes (say up to 256 MB). If a job has small map outputs, you should fetch more, and in the extreme, for something like webdatascan, you might as well fetch all the outputs on a host in one go.
          Hide
          Jothi Padmanabhan added a comment -

          Yes, ideally the number of maps fetched should be implicitly determined by the total size of map outputs fetched and should not be a fixed number. However, at the Reducer side, we do not know the size of the map outputs beforehand and the reducer needs to request specific map ids – It cannot just specify a single size as the TT will not know which maps have already been fetched by a given reducer and which have not. So, we might need to use a compromise - the reducer requests say 10 maps with their ids and also specifies the total size that it is willing to accept. The TT then sends as many map ids as that would fit into that size. We of course can tune this approach later. Thoughts?

          Show
          Jothi Padmanabhan added a comment - Yes, ideally the number of maps fetched should be implicitly determined by the total size of map outputs fetched and should not be a fixed number. However, at the Reducer side, we do not know the size of the map outputs beforehand and the reducer needs to request specific map ids – It cannot just specify a single size as the TT will not know which maps have already been fetched by a given reducer and which have not. So, we might need to use a compromise - the reducer requests say 10 maps with their ids and also specifies the total size that it is willing to accept. The TT then sends as many map ids as that would fit into that size. We of course can tune this approach later. Thoughts?
          Hide
          Runping Qi added a comment -

          +1. That is basically the same as I proposed on Nov./18.

          Show
          Runping Qi added a comment - +1. That is basically the same as I proposed on Nov./18.
          Hide
          Sameer Paranjpye added a comment -

          A reducer can also fetch the first 1% or so map outputs individually and the rest in batches of fixed size. It could use the average output size fetched to estimate how many maps to request in each batch.

          Does fetching 8 at a time have any impact on gridmix runs?

          Show
          Sameer Paranjpye added a comment - A reducer can also fetch the first 1% or so map outputs individually and the rest in batches of fixed size. It could use the average output size fetched to estimate how many maps to request in each batch. Does fetching 8 at a time have any impact on gridmix runs?
          Hide
          Jothi Padmanabhan added a comment -

          No, there is no impact on batching 10 outputs at a time on gridmix runs. An appreciable difference in shuffle time was observed with the loadgen 100 byte, 100000 maps on a 100 node cluster with 1 reducer (Trunk - 1m24s, Patch - 52s). However, for these kind of applications, the shuffle time is fairly negligible compared to the map run time (50 mins or so).

          Show
          Jothi Padmanabhan added a comment - No, there is no impact on batching 10 outputs at a time on gridmix runs. An appreciable difference in shuffle time was observed with the loadgen 100 byte, 100000 maps on a 100 node cluster with 1 reducer (Trunk - 1m24s, Patch - 52s). However, for these kind of applications, the shuffle time is fairly negligible compared to the map run time (50 mins or so).
          Hide
          Jothi Padmanabhan added a comment -

          Attached a patch that batches fetch of map outputs.
          The maximum size that needs to be pulled from a tasktracker per connection is configurable.
          The number of maps requested is adapted. For the first 1%, 1 map is requested per connection. Subsequently, upon learning the average size of maps, the number of maps to request is determined using the maximum size. The number of maps is still bounded by a limit imposed on the size of the HTTP request that the servlet can handle.

          Show
          Jothi Padmanabhan added a comment - Attached a patch that batches fetch of map outputs. The maximum size that needs to be pulled from a tasktracker per connection is configurable. The number of maps requested is adapted. For the first 1%, 1 map is requested per connection. Subsequently, upon learning the average size of maps, the number of maps to request is determined using the maximum size. The number of maps is still bounded by a limit imposed on the size of the HTTP request that the servlet can handle.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12399349/hadoop-1338-v1.patch
          against trunk revision 740237.

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12399349/hadoop-1338-v1.patch against trunk revision 740237. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3791/console This message is automatically generated.
          Hide
          Matei Zaharia added a comment -

          Rather than fetching 1% of maps, can we fetch a fixed number (e.g. 10)? My concern is that if you have 10,000 maps or something, then fetching 1% will take a while.

          Another option to consider is having the JobTracker compute the average map output size and getting it to the reducers through some other mechanism (e.g. an RPC like getMapOutputLocations). The JT already has this info. This would let each reducer work without having to sample and might be simpler. The size could also be included with each map output location, in which case the system would work even if maps have wildly different output sizes (not sure how often this happens).

          Show
          Matei Zaharia added a comment - Rather than fetching 1% of maps, can we fetch a fixed number (e.g. 10)? My concern is that if you have 10,000 maps or something, then fetching 1% will take a while. Another option to consider is having the JobTracker compute the average map output size and getting it to the reducers through some other mechanism (e.g. an RPC like getMapOutputLocations). The JT already has this info. This would let each reducer work without having to sample and might be simpler. The size could also be included with each map output location, in which case the system would work even if maps have wildly different output sizes (not sure how often this happens).
          Hide
          Devaraj Das added a comment -

          1) In TaskTracker.MapOutputServlet,

          1. The change where the call to getConf() is moved up, can be removed
          2. The "continue" statement in the for loop is redundant. Instead put a comment.
          3. The code to close the mapOutputFile in the finally block in the current codebase needs to remain
            2) In ReduceTask,
          4. The URL created for requesting map outputs can be compressed to not have repetitive occurrences of the "attempt_<jobid>m" strings. Instead, only the real attempt ID should be sent and the tasktracker should recreate the full attempt ID string. That will ensure we can fetch more without hitting HTTP_ENTITY_TOO_LARGE error.
          5. No need to pass the maxFetchSizePerHost as a request parameter.
          6. Does it make sense to remove numInFlight and instead only base checks on uniqueHosts.size() ? I believe even in the current code, the updates to numInFlight and uniqueHosts go hand-in-hand..
            I am still going through ReduceTask.java .. so I might have more comments ..
          Show
          Devaraj Das added a comment - 1) In TaskTracker.MapOutputServlet, The change where the call to getConf() is moved up, can be removed The "continue" statement in the for loop is redundant. Instead put a comment. The code to close the mapOutputFile in the finally block in the current codebase needs to remain 2) In ReduceTask, The URL created for requesting map outputs can be compressed to not have repetitive occurrences of the "attempt_<jobid> m " strings. Instead, only the real attempt ID should be sent and the tasktracker should recreate the full attempt ID string. That will ensure we can fetch more without hitting HTTP_ENTITY_TOO_LARGE error. No need to pass the maxFetchSizePerHost as a request parameter. Does it make sense to remove numInFlight and instead only base checks on uniqueHosts.size() ? I believe even in the current code, the updates to numInFlight and uniqueHosts go hand-in-hand.. I am still going through ReduceTask.java .. so I might have more comments ..
          Hide
          Jothi Padmanabhan added a comment -

          Cancelling patch to incorporate review comments

          Show
          Jothi Padmanabhan added a comment - Cancelling patch to incorporate review comments
          Hide
          Devaraj Das added a comment -

          Continuing on ReduceTask.java,
          1) Change the notifyAll to notify (as it was earlier)
          2) I think the retryFetches can be removed and on an error leave the knownOutputs unchanged.
          3) I think knownOutputs can be used for all purposes, currently for which new maps have been defined (just that knownOutputs should not be updated on an error)
          4) CopyResult object need not take a MapOutputLocation. Instead it can just take

          {mapID,host,size}

          combination. That will simplify the code to do with (3) above.

          Show
          Devaraj Das added a comment - Continuing on ReduceTask.java, 1) Change the notifyAll to notify (as it was earlier) 2) I think the retryFetches can be removed and on an error leave the knownOutputs unchanged. 3) I think knownOutputs can be used for all purposes, currently for which new maps have been defined (just that knownOutputs should not be updated on an error) 4) CopyResult object need not take a MapOutputLocation. Instead it can just take {mapID,host,size} combination. That will simplify the code to do with (3) above.
          Hide
          Jothi Padmanabhan added a comment -

          Patch incorporating review comments

          Show
          Jothi Padmanabhan added a comment - Patch incorporating review comments
          Hide
          Jothi Padmanabhan added a comment -

          MAPREDUCE-318 incorporated this

          Show
          Jothi Padmanabhan added a comment - MAPREDUCE-318 incorporated this

            People

            • Assignee:
              Jothi Padmanabhan
              Reporter:
              Devaraj Das
            • Votes:
              0 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development