Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-2926

Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

    Details

    • Type: Improvement
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.1.0
    • Fix Version/s: None
    • Component/s: Shuffle
    • Labels:
      None

      Description

      Currently Spark has already integrated sort-based shuffle write, which greatly improve the IO performance and reduce the memory consumption when reducer number is very large. But for the reducer side, it still adopts the implementation of hash-based shuffle reader, which neglects the ordering attributes of map output data in some situations.

      Here we propose a MR style sort-merge like shuffle reader for sort-based shuffle to better improve the performance of sort-based shuffle.

      Working in progress code and performance test report will be posted later when some unit test bugs are fixed.

      Any comments would be greatly appreciated.
      Thanks a lot.

      1. Spark Shuffle Test Report on Spark2.x.pdf
        186 kB
        Li Yuanjian
      2. SortBasedShuffleReader on Spark 2.x.pdf
        3.52 MB
        Li Yuanjian
      3. Spark Shuffle Test Report(contd).pdf
        326 kB
        Saisai Shao
      4. Spark Shuffle Test Report.pdf
        340 kB
        Saisai Shao
      5. SortBasedShuffleRead.pdf
        56 kB
        Saisai Shao

        Issue Links

          Activity

          Hide
          jerryshao Saisai Shao added a comment -

          A rough design doc is uploaded. Any comments would be greatly appreciated.

          Show
          jerryshao Saisai Shao added a comment - A rough design doc is uploaded. Any comments would be greatly appreciated.
          Hide
          sandyr Sandy Ryza added a comment -

          Hi Saisai,

          This seems like a very useful addition. If my understanding is correct, the reduce-side merge procedure seems like a fairly straightforward port of the approach used in MapReduce.

          A couple considerations:

          • Is there code we can share with the map-side merge?
          • It may make sense to use by-key sorting in some situations even when the operation doesn't require a key ordering. For example, if we're doing a groupByKey and are going to end up spilling most of the data on the reduce side anyway, holding objects in Aggregator's in-memory hashmap is pure overhead.
          Show
          sandyr Sandy Ryza added a comment - Hi Saisai, This seems like a very useful addition. If my understanding is correct, the reduce-side merge procedure seems like a fairly straightforward port of the approach used in MapReduce. A couple considerations: Is there code we can share with the map-side merge? It may make sense to use by-key sorting in some situations even when the operation doesn't require a key ordering. For example, if we're doing a groupByKey and are going to end up spilling most of the data on the reduce side anyway, holding objects in Aggregator's in-memory hashmap is pure overhead.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Sandy,

          Thanks a lot for your comments, basic idea is the same as MapReduce, but the implementation is a little different to be compatible with Spark's code path.

          Code may has some common parts with map-side merge, I think currently we should verify the pros and cons of this method, then we can do the refactoring to make it better.

          For operation like groupByKey, I'm not sure the performance of by-key sorting is better than Aggregator way or not, currently I still use Aggregator in reduce side. We need to do some performance tests to verify this by-key sorting's necessity.

          Show
          jerryshao Saisai Shao added a comment - Hi Sandy, Thanks a lot for your comments, basic idea is the same as MapReduce, but the implementation is a little different to be compatible with Spark's code path. Code may has some common parts with map-side merge, I think currently we should verify the pros and cons of this method, then we can do the refactoring to make it better. For operation like groupByKey, I'm not sure the performance of by-key sorting is better than Aggregator way or not, currently I still use Aggregator in reduce side. We need to do some performance tests to verify this by-key sorting's necessity.
          Hide
          matei Matei Zaharia added a comment -

          Hey Saisai, a couple of questions about this:

          • Doesn't the ExternalAppendOnlyMap used on the reduce side already do a merge-sort? You won't do much better than that unless you assume that an Ordering is given for the key, which isn't actually something we receive at the ShuffleWriter from our APIs so far (though we have the opportunity to pass it through in Scala).
          • What kind of data did you test key sorting on the map side with? The cost of comparisons will depend a lot on the data type, but will be much higher for things like strings or tuples.
          Show
          matei Matei Zaharia added a comment - Hey Saisai, a couple of questions about this: Doesn't the ExternalAppendOnlyMap used on the reduce side already do a merge-sort? You won't do much better than that unless you assume that an Ordering is given for the key, which isn't actually something we receive at the ShuffleWriter from our APIs so far (though we have the opportunity to pass it through in Scala). What kind of data did you test key sorting on the map side with? The cost of comparisons will depend a lot on the data type, but will be much higher for things like strings or tuples.
          Hide
          matei Matei Zaharia added a comment -

          Basically because of these things, it would be great to see results from a prototype to decide whether we want a different code path here. One place where it might help a lot is sortByKey, but even there, the sorting algorithm we use (TimSort) actually does take advantage of partially sorted runs. And we have the problem of not passing an ordering on the map side yet. To me it's really non-obvious whether sorting on the map side will make the overall job faster – it might just move CPU cost around from the reduce task to the map task.

          Show
          matei Matei Zaharia added a comment - Basically because of these things, it would be great to see results from a prototype to decide whether we want a different code path here. One place where it might help a lot is sortByKey, but even there, the sorting algorithm we use (TimSort) actually does take advantage of partially sorted runs. And we have the problem of not passing an ordering on the map side yet. To me it's really non-obvious whether sorting on the map side will make the overall job faster – it might just move CPU cost around from the reduce task to the map task.
          Hide
          jerryshao Saisai Shao added a comment - - edited

          Hi Matei, thanks a lot for your comments.

          The original point of this proposal is to directly merge the data without re-sorting (when spilling using EAOM) if data is by-key sorted (with keyOrdering or hashcode ordering) in map outputs. If there is no Ordering available or needed like groupByKey, current design thinking is still using EAOM to do aggregation.

          I use SparkPerf sort-by-key workload to test the current shuffle implementations:

          1. sort shuffle write with hash shuffle read (current sort-based shuffle implementation).
          2. sort shuffle write and sort merge shuffle read (my prototype).

          Test data type is String, key and value length is 10, and record number is 2G, data is stored in HDFS. My rough test shows that my prototype may be slower in shuffle write (1.18x slower) because of another key comparison, but 2.6x faster than HashShuffleReader in reduce side.

          I have to admit that only sort-by-key cannot well illustrate the necessity of this proposal, also the method is better for sortByKey scenario. I will continue to do some other workload tests to see if this method is really necessary or not. I will post my test result later.

          At least I think this method can use memory more effectively and alleviate GC effect, because it stores map output partitions in memory with raw ByteBuffer, and need not to main a large hash map to do aggregation.

          Thanks again for your comments .

          Show
          jerryshao Saisai Shao added a comment - - edited Hi Matei, thanks a lot for your comments. The original point of this proposal is to directly merge the data without re-sorting (when spilling using EAOM) if data is by-key sorted (with keyOrdering or hashcode ordering) in map outputs. If there is no Ordering available or needed like groupByKey, current design thinking is still using EAOM to do aggregation. I use SparkPerf sort-by-key workload to test the current shuffle implementations: 1. sort shuffle write with hash shuffle read (current sort-based shuffle implementation). 2. sort shuffle write and sort merge shuffle read (my prototype). Test data type is String, key and value length is 10, and record number is 2G, data is stored in HDFS. My rough test shows that my prototype may be slower in shuffle write (1.18x slower) because of another key comparison, but 2.6x faster than HashShuffleReader in reduce side. I have to admit that only sort-by-key cannot well illustrate the necessity of this proposal, also the method is better for sortByKey scenario. I will continue to do some other workload tests to see if this method is really necessary or not. I will post my test result later. At least I think this method can use memory more effectively and alleviate GC effect, because it stores map output partitions in memory with raw ByteBuffer, and need not to main a large hash map to do aggregation. Thanks again for your comments .
          Hide
          jerryshao Saisai Shao added a comment - - edited

          Hi Matei,

          I just uploaded a Spark shuffle performance test report. In this report, I choose 3 different workloads (sort-by-key, aggregate-by-key and group-by-key) in SparkPerf to test the performance of current 3 shuffle implementations: hash-based shuffle; sort-based shuffle with HashShuffleReader; sort-based shuffle with sort-merge shuffle reader (our prototype). Generally for sort-by-key our prototype can gain more benefits than other two implementations, while for other two workloads the performance is almost the same.

          Would you mind taking a look at it, any comment would be greatly appreciated, thanks a lot.

          Show
          jerryshao Saisai Shao added a comment - - edited Hi Matei, I just uploaded a Spark shuffle performance test report. In this report, I choose 3 different workloads (sort-by-key, aggregate-by-key and group-by-key) in SparkPerf to test the performance of current 3 shuffle implementations: hash-based shuffle; sort-based shuffle with HashShuffleReader; sort-based shuffle with sort-merge shuffle reader (our prototype). Generally for sort-by-key our prototype can gain more benefits than other two implementations, while for other two workloads the performance is almost the same. Would you mind taking a look at it, any comment would be greatly appreciated, thanks a lot.
          Hide
          jerryshao Saisai Shao added a comment -

          I think this prototype can easily offer the functionality SPARK-2978 needed.

          Show
          jerryshao Saisai Shao added a comment - I think this prototype can easily offer the functionality SPARK-2978 needed.
          Hide
          matei Matei Zaharia added a comment -

          I see, thanks for posting the benchmarks. This does seem like it's worth investigating further. Can you also run some tests with other aggregation factors? Also, a few notes on the configuration:

          • There's a big change in behavior when you go above 200 reduce tasks because ExternalSorter does the same thing as hash-based shuffle if the # of reduce tasks is below 200.
          • When doing these kind of tests, set spark.kryo.referenceTracking = false, otherwise serialization will be a major CPU cost.
          • We need to try other types of keys as well, e.g. integers or longer strings. As I said the cost to compare elements will depend on their datatype.

          Finally, it would be great if this proposal reused parts of ExternalSorter or otherwise shared code for it. It looks like it's not a clear win in all cases, but maybe for example we can use it in sortByKey, and later in groupBy / join when we update those to deal with it.

          Show
          matei Matei Zaharia added a comment - I see, thanks for posting the benchmarks. This does seem like it's worth investigating further. Can you also run some tests with other aggregation factors? Also, a few notes on the configuration: There's a big change in behavior when you go above 200 reduce tasks because ExternalSorter does the same thing as hash-based shuffle if the # of reduce tasks is below 200. When doing these kind of tests, set spark.kryo.referenceTracking = false, otherwise serialization will be a major CPU cost. We need to try other types of keys as well, e.g. integers or longer strings. As I said the cost to compare elements will depend on their datatype. Finally, it would be great if this proposal reused parts of ExternalSorter or otherwise shared code for it. It looks like it's not a clear win in all cases, but maybe for example we can use it in sortByKey, and later in groupBy / join when we update those to deal with it.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Matei, sorry for late response, I will test more scenarios with your notes, also factor out to see if some codes can be shared with ExternalSorter. Thanks a lot.

          Show
          jerryshao Saisai Shao added a comment - Hi Matei, sorry for late response, I will test more scenarios with your notes, also factor out to see if some codes can be shared with ExternalSorter. Thanks a lot.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Matei Zaharia, according to your comments, I just did another round of performance test for current impl of sort-based shuffle and my proposal.

          For sort-by-key case, I tested with different length string as key to verify the key comparison overhead. Though as you said the comparison time is increased, still the total performance is better than current implementation.

          For aggregation-by-key case, with different aggregation factor the performance of these two implementations are closer.

          I think we can use it in the sortByKey() at first as you said, besides some codes like mergeSort() and mergeWithAggregation() can be shard with this proposal. Would you mind taking a look at this new test report and give me some comments?

          Thanks a lot and appreciate your time.

          Show
          jerryshao Saisai Shao added a comment - Hi Matei Zaharia , according to your comments, I just did another round of performance test for current impl of sort-based shuffle and my proposal. For sort-by-key case, I tested with different length string as key to verify the key comparison overhead. Though as you said the comparison time is increased, still the total performance is better than current implementation. For aggregation-by-key case, with different aggregation factor the performance of these two implementations are closer. I think we can use it in the sortByKey() at first as you said, besides some codes like mergeSort() and mergeWithAggregation() can be shard with this proposal. Would you mind taking a look at this new test report and give me some comments? Thanks a lot and appreciate your time.
          Hide
          rxin Reynold Xin added a comment -

          Do you have a branch that I can test with? I'm running some sorting tests and can test this out also on some dataset.

          Show
          rxin Reynold Xin added a comment - Do you have a branch that I can test with? I'm running some sorting tests and can test this out also on some dataset.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Reynold, thanks a lot for your watching this, here is the branch (https://github.com/jerryshao/apache-spark/tree/sort-based-shuffle-read), though code is not rebase to the latest master branch.

          Show
          jerryshao Saisai Shao added a comment - Hi Reynold, thanks a lot for your watching this, here is the branch ( https://github.com/jerryshao/apache-spark/tree/sort-based-shuffle-read ), though code is not rebase to the latest master branch.
          Hide
          rxin Reynold Xin added a comment -

          Do you mind creating a separate branch that's based on https://github.com/rxin/spark/tree/netty-blockTransferService ?

          Show
          rxin Reynold Xin added a comment - Do you mind creating a separate branch that's based on https://github.com/rxin/spark/tree/netty-blockTransferService ?
          Hide
          jerryshao Saisai Shao added a comment - - edited

          Ok, I will take a try and let you know when it is ready. Thanks a lot.

          Show
          jerryshao Saisai Shao added a comment - - edited Ok, I will take a try and let you know when it is ready. Thanks a lot.
          Hide
          jerryshao Saisai Shao added a comment -

          Hey Reynold Xin, here is the branch rebased on your code (https://github.com/jerryshao/apache-spark/tree/sort-shuffle-read-new-netty), mind taking a look at it? Thanks a lot.

          Show
          jerryshao Saisai Shao added a comment - Hey Reynold Xin , here is the branch rebased on your code ( https://github.com/jerryshao/apache-spark/tree/sort-shuffle-read-new-netty ), mind taking a look at it? Thanks a lot.
          Hide
          rxin Reynold Xin added a comment -

          Thanks - will do some testing this week for my dataset.

          Show
          rxin Reynold Xin added a comment - Thanks - will do some testing this week for my dataset.
          Hide
          jerryshao Saisai Shao added a comment -

          Looking forward to your feedback .

          Show
          jerryshao Saisai Shao added a comment - Looking forward to your feedback .
          Hide
          sandyr Sandy Ryza added a comment -

          Reynold Xin did you ever get a chance to try this out?

          Show
          sandyr Sandy Ryza added a comment - Reynold Xin did you ever get a chance to try this out?
          Hide
          apachespark Apache Spark added a comment -

          User 'jerryshao' has created a pull request for this issue:
          https://github.com/apache/spark/pull/3438

          Show
          apachespark Apache Spark added a comment - User 'jerryshao' has created a pull request for this issue: https://github.com/apache/spark/pull/3438
          Hide
          manojsamel Manoj Samel added a comment -

          Which release will have this change available ?

          Show
          manojsamel Manoj Samel added a comment - Which release will have this change available ?
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Manoj Samel, we're still waiting for any maintainer who could review this, thanks for your attention.

          Show
          jerryshao Saisai Shao added a comment - Hi Manoj Samel , we're still waiting for any maintainer who could review this, thanks for your attention.
          Hide
          DoingDone9 Zhongshuai Pei added a comment - - edited

          hi, i test sortByKey with spark-perf(https://github.com/databricks/spark-perf), but i have a result like that :

          spark1.3 :

          {"time":452.453}

          ,

          {"time":457.929}

          ,

          {"time":452.295}

          with your pr

          {"time":471.215}

          ,

          {"time":460.59}

          ,

          {"time":463.795}

          could you tell me something that i did incorrectly. Thank you.

          Show
          DoingDone9 Zhongshuai Pei added a comment - - edited hi, i test sortByKey with spark-perf( https://github.com/databricks/spark-perf ), but i have a result like that : spark1.3 : {"time":452.453} , {"time":457.929} , {"time":452.295} with your pr {"time":471.215} , {"time":460.59} , {"time":463.795} could you tell me something that i did incorrectly. Thank you.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Zhongshuai Pei, would you please give some detailed information about your test environment, like cluster size, hardware configurations, as well as Spark configurations. Also would you please offer each stage's running time as compared to total running time. Thanks a lot.

          As for my local environment with small 1 master + 4 slaves, I tested with my patch rebased to the latest master, as compared to the master branch, the result shows that the performance of sortByKey with my patch is still faster than the master branch at about (15% to 20 %).

          I think there's a possibility that different hardware configurations may shift the hardware bottleneck and result in different results, I will investigate more, it would be very helpful if you could offer some more detailed information.

          Show
          jerryshao Saisai Shao added a comment - Hi Zhongshuai Pei , would you please give some detailed information about your test environment, like cluster size, hardware configurations, as well as Spark configurations. Also would you please offer each stage's running time as compared to total running time. Thanks a lot. As for my local environment with small 1 master + 4 slaves, I tested with my patch rebased to the latest master, as compared to the master branch, the result shows that the performance of sortByKey with my patch is still faster than the master branch at about (15% to 20 %). I think there's a possibility that different hardware configurations may shift the hardware bottleneck and result in different results, I will investigate more, it would be very helpful if you could offer some more detailed information.
          Hide
          XuanYuan Li Yuanjian added a comment - - edited

          During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x.

          I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring 12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50 vs current master version(see detail screenshot and test data in attatched pdf), especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf".

          I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master.

          I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated.

          Show
          XuanYuan Li Yuanjian added a comment - - edited During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x. I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring 12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50 vs current master version(see detail screenshot and test data in attatched pdf), especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf". I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master. I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated.
          Hide
          apachespark Apache Spark added a comment -

          User 'xuanyuanking' has created a pull request for this issue:
          https://github.com/apache/spark/pull/19745

          Show
          apachespark Apache Spark added a comment - User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/19745
          Hide
          XuanYuan Li Yuanjian added a comment -

          I just giving a preview PR above, I'll collect more suggestions about this and maybe raise a SPIP vote later.

          Show
          XuanYuan Li Yuanjian added a comment - I just giving a preview PR above, I'll collect more suggestions about this and maybe raise a SPIP vote later.
          Hide
          jerryshao Saisai Shao added a comment -

          Li Yuanjian, would you please use spark-perf's micro benchmark (https://github.com/databricks/spark-perf) to verify again with same workload as mentioned in original test report? That would be more comparable. Theoretically this solution cannot get 12x-30x boosting according to my test, because this solution don't actually reduce the computation in logic, just moving part of comparison from reduce to map, which potentially reduces some cpu cycling and improves cache hit.

          Can you please explain the key difference and the reason of such boosting? Thanks!

          Show
          jerryshao Saisai Shao added a comment - Li Yuanjian , would you please use spark-perf's micro benchmark ( https://github.com/databricks/spark-perf ) to verify again with same workload as mentioned in original test report? That would be more comparable. Theoretically this solution cannot get 12x-30x boosting according to my test, because this solution don't actually reduce the computation in logic, just moving part of comparison from reduce to map, which potentially reduces some cpu cycling and improves cache hit. Can you please explain the key difference and the reason of such boosting? Thanks!
          Hide
          XuanYuan Li Yuanjian added a comment -

          Saisai Shao, thanks a lot for your advise and reply.

          would you please use spark-perf's micro benchmark (https://github.com/databricks/spark-perf) to verify again with same workload as mentioned in original test report?

          Sure, I'll verify this again ASAP.

          Theoretically this solution cannot get 12x-30x boosting according to my test

          Firstly I also had question on this, I attached all the screenshot in the pdf. The 12x boosting happened in both scenario of reducer task number is 1 and 100. The duration of this stage reduce from 2min to 9s(13x) while reducer task number is 1 and reduce from 20min to 1.4min while the number is 100. The 30x boosting happened after I add more data pressure for reducer task.

          Can you please explain the key difference and the reason of such boosting?

          I think the key difference mainly comes from this 2 points:
          1. Like saisai said, BlockStoreShuffleReader use `ExternalSorter` deal with the reduce work, each record should do the compare work, while SortShuffleReader is more cpu friendly, it collect all shuffle map result(both data in memory and data spilled to disk) and sort them by merging sort(each partition has been sorted in map side).
          2. The obvious cut down of peak memory used in reduce task, this will save gc time during sorting.

          Show
          XuanYuan Li Yuanjian added a comment - Saisai Shao , thanks a lot for your advise and reply. would you please use spark-perf's micro benchmark ( https://github.com/databricks/spark-perf ) to verify again with same workload as mentioned in original test report? Sure, I'll verify this again ASAP. Theoretically this solution cannot get 12x-30x boosting according to my test Firstly I also had question on this, I attached all the screenshot in the pdf. The 12x boosting happened in both scenario of reducer task number is 1 and 100. The duration of this stage reduce from 2min to 9s(13x) while reducer task number is 1 and reduce from 20min to 1.4min while the number is 100. The 30x boosting happened after I add more data pressure for reducer task. Can you please explain the key difference and the reason of such boosting? I think the key difference mainly comes from this 2 points: 1. Like saisai said, BlockStoreShuffleReader use `ExternalSorter` deal with the reduce work, each record should do the compare work, while SortShuffleReader is more cpu friendly, it collect all shuffle map result(both data in memory and data spilled to disk) and sort them by merging sort(each partition has been sorted in map side). 2. The obvious cut down of peak memory used in reduce task, this will save gc time during sorting.
          Hide
          jerryshao Saisai Shao added a comment -

          So your saying of 12x-30x boosting is only referring to reduce stage? of course this solution can boost the reduce stage, but it will also increase the time of map stage, so we'd better to use job time to evaluate.

          Show
          jerryshao Saisai Shao added a comment - So your saying of 12x-30x boosting is only referring to reduce stage? of course this solution can boost the reduce stage, but it will also increase the time of map stage, so we'd better to use job time to evaluate.
          Hide
          XuanYuan Li Yuanjian added a comment -

          Yes, only the reduce stage. You're right, I shouldn't only pay attention to the final stage. I rearrange all messages of screenshot below:

          test name map stage shuffle write reduce stage shuffle read map stage duration reduce stage duration total job time code base
          Test Round1 654.4 MB 802.6MB 3.6min 2.0min 5.5min master
          Test Round1 654.4 MB 714.0MB 3.4min 9s 3.5min SPARK-2926
          Test Round2: Add more pressure for SortShuffleReader by coalesce 654.4MB 654.4MB 2.6min 20min 22min master
          Test Round2: Add more pressure for SortShuffleReader by coalesce 654.4MB 654.4MB 3.7min 1.4min 5.1min SPARK-2926
          Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 26s 16min 16min master
          Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 21s 13s 34s SPARK-2926
          Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 22s 25s 47s SPARK-2926(code change for force spill to disk)
          Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 22s 29s 51s SPARK-2926(code change for force spill to disk)
          Show
          XuanYuan Li Yuanjian added a comment - Yes, only the reduce stage. You're right, I shouldn't only pay attention to the final stage. I rearrange all messages of screenshot below: test name map stage shuffle write reduce stage shuffle read map stage duration reduce stage duration total job time code base Test Round1 654.4 MB 802.6MB 3.6min 2.0min 5.5min master Test Round1 654.4 MB 714.0MB 3.4min 9s 3.5min SPARK-2926 Test Round2: Add more pressure for SortShuffleReader by coalesce 654.4MB 654.4MB 2.6min 20min 22min master Test Round2: Add more pressure for SortShuffleReader by coalesce 654.4MB 654.4MB 3.7min 1.4min 5.1min SPARK-2926 Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 26s 16min 16min master Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 21s 13s 34s SPARK-2926 Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 22s 25s 47s SPARK-2926 (code change for force spill to disk) Test Round3: Test file spill scenario of sort shuffle reader 142.6MB 142.6MB 22s 29s 51s SPARK-2926 (code change for force spill to disk)
          Hide
          XuanYuan Li Yuanjian added a comment -

          Saisai Shao Hi saisai, thanks for your advise, I added a test report according to your suggestion. As described in the report, I only compare two shuffle mode in 'sort-by-key' workload because other test workloads shared same code paths in POC implementation(SortShuffleWriter with BlockStoreShuffleReader).
          Also add a config( code link ) just to force shutting down SerializedShuffle in 'sort-by-key' workload, otherwise both of master and POC use the SerializedShuffle.
          For sort-by-key work around after closing Serialized Shuffle, the POC version can brings 1.44x faster than current master, although map side stage 1.16x slower, but reducer stage has 9.4x boosting.

          Show
          XuanYuan Li Yuanjian added a comment - Saisai Shao Hi saisai, thanks for your advise, I added a test report according to your suggestion. As described in the report, I only compare two shuffle mode in 'sort-by-key' workload because other test workloads shared same code paths in POC implementation(SortShuffleWriter with BlockStoreShuffleReader). Also add a config( code link ) just to force shutting down SerializedShuffle in 'sort-by-key' workload, otherwise both of master and POC use the SerializedShuffle. For sort-by-key work around after closing Serialized Shuffle, the POC version can brings 1.44x faster than current master, although map side stage 1.16x slower, but reducer stage has 9.4x boosting.

            People

            • Assignee:
              jerryshao Saisai Shao
              Reporter:
              jerryshao Saisai Shao
            • Votes:
              4 Vote for this issue
              Watchers:
              66 Start watching this issue

              Dates

              • Created:
                Updated:

                Development