Details

    • Type: Epic
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Spark Core, SQL
    • Labels:
      None

      Description

      Query planning is one of the main factors in high performance, but the current Spark engine requires the execution DAG for a job to be set in advance. Even with cost­-based optimization, it is hard to know the behavior of data and user-defined functions well enough to always get great execution plans. This JIRA proposes to add adaptive query execution, so that the engine can change the plan for each query as it sees what data earlier stages produced.

      We propose adding this to Spark SQL / DataFrames first, using a new API in the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, the functionality could be extended to other libraries or the RDD API, but that is more difficult than adding it in SQL.

      I've attached a design doc by Yin Huai and myself explaining how it would work in more detail.

        Issue Links

          Issues in Epic

          There are no issues in this epic.

            Activity

            Hide
            irashid Imran Rashid added a comment -

            I know the 1000 partitions used in the design doc is just for an example, but I just wanted to point out that the number probably needs to be much larger. With a 2GB limit per partition, that is already 2 TB max. My rule of thumb has been to keep partitions around 100 MB, which is roughly inline with the 64 MB you mention in the doc, which brings you down to 100 GB. And given that you want to deal w/ skewed data etc., you probably actually want to leave quite a bit of room, which limits you to relatively small datasets.

            The key point is that after you go over 2000 partitions, you are going into HighlyCompressedMapOutput territory, which will be relatively useless for this. Perhaps each shuffle map task can always send an uncompressed map status back to the driver? Maybe you could only use the HighlyCompressedMapStatus on the shuffle-read side?? I'm not sure about the performance implications, just throwing out an idea.

            Show
            irashid Imran Rashid added a comment - I know the 1000 partitions used in the design doc is just for an example, but I just wanted to point out that the number probably needs to be much larger. With a 2GB limit per partition, that is already 2 TB max. My rule of thumb has been to keep partitions around 100 MB, which is roughly inline with the 64 MB you mention in the doc, which brings you down to 100 GB. And given that you want to deal w/ skewed data etc., you probably actually want to leave quite a bit of room, which limits you to relatively small datasets. The key point is that after you go over 2000 partitions, you are going into HighlyCompressedMapOutput territory, which will be relatively useless for this. Perhaps each shuffle map task can always send an uncompressed map status back to the driver? Maybe you could only use the HighlyCompressedMapStatus on the shuffle-read side?? I'm not sure about the performance implications, just throwing out an idea.
            Hide
            irashid Imran Rashid added a comment -

            just to continue brainstorming on what to do with large data – I realize that my earlier suggestion about sending uncompressed (or at least not HighlyCompressed) map status back to the driver may not work, since part of the point is to avoid OOM on the driver, not to just reduce communication from the driver back to the executors.

            But here are two other ideas:
            1. Create a variant of HighlyCompressedMapOutput which stores all block sizes that are more than some factor above the median, lets say 5x? This would let you deal w/ really extreme skew without increasing the size too much.
            2. Since you only need the summed size of the map output per reduce partition, you could first perform a tree-reduce of those sizes on the executors before sending back to the driver. avoids trying to guess some arbitrary cutoff factor, but also way more complicated.

            Show
            irashid Imran Rashid added a comment - just to continue brainstorming on what to do with large data – I realize that my earlier suggestion about sending uncompressed (or at least not HighlyCompressed ) map status back to the driver may not work, since part of the point is to avoid OOM on the driver, not to just reduce communication from the driver back to the executors. But here are two other ideas: 1. Create a variant of HighlyCompressedMapOutput which stores all block sizes that are more than some factor above the median, lets say 5x? This would let you deal w/ really extreme skew without increasing the size too much. 2. Since you only need the summed size of the map output per reduce partition, you could first perform a tree-reduce of those sizes on the executors before sending back to the driver. avoids trying to guess some arbitrary cutoff factor, but also way more complicated.
            Hide
            matei Matei Zaharia added a comment -

            Hey Imran, this could make sense, but note that the problem will only happen if you have 2000 map output partitions, which would've been 2000 reduce tasks normally. Otherwise, you can have as many map tasks as needed with fewer partitions. In most jobs, I'd expect data to get significantly smaller after the maps, so we'd catch that. In particular, for choosing between broadcast and shuffle joins this should be fine. We can do something different if we suspect that there is going to be tons of map output and we think there's nontrivial planning to be done once we see it.

            Show
            matei Matei Zaharia added a comment - Hey Imran, this could make sense, but note that the problem will only happen if you have 2000 map output partitions, which would've been 2000 reduce tasks normally. Otherwise, you can have as many map tasks as needed with fewer partitions. In most jobs, I'd expect data to get significantly smaller after the maps, so we'd catch that. In particular, for choosing between broadcast and shuffle joins this should be fine. We can do something different if we suspect that there is going to be tons of map output and we think there's nontrivial planning to be done once we see it.
            Hide
            maver1ck Maciej Bryński added a comment - - edited

            Matei Zaharia
            Hi,
            I'm not sure if my issue is related to this Jira.

            In 1.6.0 when using sql limit Spark do following:

            • execute limit on every partition
            • then take result

            Is it possible to finish scanning partitions when we collect enough rows for limit ?

            Show
            maver1ck Maciej Bryński added a comment - - edited Matei Zaharia Hi, I'm not sure if my issue is related to this Jira. In 1.6.0 when using sql limit Spark do following: execute limit on every partition then take result Is it possible to finish scanning partitions when we collect enough rows for limit ?
            Hide
            justin.uang Justin Uang added a comment -

            I like this idea a lot. One thing we encounter in our use cases is that we end up accidentally joining on a field that is 50% nulls, or a string that represents null like "N/A". It then becomes quite cumbersome to have to constantly have to have a Spark expert dig in and find why there is 1 task that will never finish. Would it be possible to add a threshold such that if a join key ever gets too big, it will just fail the job with an error message?

            Show
            justin.uang Justin Uang added a comment - I like this idea a lot. One thing we encounter in our use cases is that we end up accidentally joining on a field that is 50% nulls, or a string that represents null like "N/A". It then becomes quite cumbersome to have to constantly have to have a Spark expert dig in and find why there is 1 task that will never finish. Would it be possible to add a threshold such that if a join key ever gets too big, it will just fail the job with an error message?
            Hide
            assaf.mendelson Assaf Mendelson added a comment -

            I like the overall idea.
            What I am trying to figure out is the portion where first the map portion is performed and then the reducer.
            if DAGScheduler.submitMapStage() waits for all the map processing to finish and only then reducer start, this can really slow things down as it will begin only when the last map finishes.

            Wouldn't it be better to start the reducers once the first few mappers finished (or at least when there are idle executors)? Assuming the first few mappers are a representative of the entire maps then this shouldn't affect the assessment of statistics too much.

            Show
            assaf.mendelson Assaf Mendelson added a comment - I like the overall idea. What I am trying to figure out is the portion where first the map portion is performed and then the reducer. if DAGScheduler.submitMapStage() waits for all the map processing to finish and only then reducer start, this can really slow things down as it will begin only when the last map finishes. Wouldn't it be better to start the reducers once the first few mappers finished (or at least when there are idle executors)? Assuming the first few mappers are a representative of the entire maps then this shouldn't affect the assessment of statistics too much.
            Hide
            irashid Imran Rashid added a comment -

            Assaf Mendelson reducers already have to wait for the last mapper to finish. Spark has always behaved this way. (I think you might find discussions referring to this as the "stage barrier"). I don't see that changing anytime soon – while its not ideal, doing away with that would a lot of complexity.

            Show
            irashid Imran Rashid added a comment - Assaf Mendelson reducers already have to wait for the last mapper to finish. Spark has always behaved this way. (I think you might find discussions referring to this as the "stage barrier"). I don't see that changing anytime soon – while its not ideal, doing away with that would a lot of complexity.

              People

              • Assignee:
                yhuai Yin Huai
                Reporter:
                matei Matei Zaharia
              • Votes:
                21 Vote for this issue
                Watchers:
                98 Start watching this issue

                Dates

                • Created:
                  Updated:

                  Development