Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13219

Pushdown predicate propagation in SparkSQL with join

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.4.1, 1.5.2, 1.6.0
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:
      None
    • Environment:

      Spark 1.4
      Datastax Spark connector 1.4
      Cassandra. 2.1.12
      Centos 6.6

      Description

      When 2 or more tables are joined in SparkSQL and there is an equality clause in query on attributes used to perform the join, it is useful to apply that clause on scans for both table. If this is not done, one of the tables results in full scan which can reduce the query dramatically. Consider following example with 2 tables being joined.

      CREATE TABLE assets (
          assetid int PRIMARY KEY,
          address text,
          propertyname text
      )
      CREATE TABLE tenants (
          assetid int PRIMARY KEY,
          name text
      )
      spark-sql> explain select t.name from tenants t, assets a where a.assetid = t.assetid and t.assetid='1201';
      WARN  2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      == Physical Plan ==
      Project [name#14]
       ShuffledHashJoin [assetid#13], [assetid#15], BuildRight
        Exchange (HashPartitioning 200)
         Filter (CAST(assetid#13, DoubleType) = 1201.0)
          HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, Some(t)), None
        Exchange (HashPartitioning 200)
         HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), None
      Time taken: 1.354 seconds, Fetched 8 row(s)
      

      The simple workaround is to add another equality condition for each table but it becomes cumbersome. It will be helpful if the query planner could improve filter propagation.

        Issue Links

          Activity

          Hide
          smilegator Xiao Li added a comment -

          See this PR: https://github.com/apache/spark/pull/10490.

          Let me know if you hit any bug. Thanks!

          Show
          smilegator Xiao Li added a comment - See this PR: https://github.com/apache/spark/pull/10490 . Let me know if you hit any bug. Thanks!
          Hide
          doodlegum Abhinav Chawade added a comment -

          Thanks Xiao. I will pull in the request and see how it performs.

          Show
          doodlegum Abhinav Chawade added a comment - Thanks Xiao. I will pull in the request and see how it performs.
          Hide
          smilegator Xiao Li added a comment -

          Welcome

          Show
          smilegator Xiao Li added a comment - Welcome
          Hide
          doodlegum Abhinav Chawade added a comment -

          I created a build of Spark 1.4.1 which incorporates your patch but somehow predicates are still not being propagated. The set of steps I followed
          1) Build Spark 1.4.1 with patch incorporated.
          2) Replace spark-catalyst jar on all nodes.
          3) Run explain on following command in spark-sql. Notice the query plan.

          spark-sql> explain select t.assetid from tenants t inner join assets on t.assetid = assets.assetid where t.assetid=1201;    
          == Physical Plan ==
          Project [assetid#18]
           ShuffledHashJoin [assetid#18], [assetid#20], BuildRight
            Exchange (HashPartitioning 200)
             Filter (assetid#18 = 1201)
              HiveTableScan [assetid#18], (MetastoreRelation element22082, tenants, Some(t)), None
            Exchange (HashPartitioning 200)
             HiveTableScan [assetid#20], (MetastoreRelation element22082, assets, None), None
          Time taken: 2.741 seconds, Fetched 8 row(s)
          
          Show
          doodlegum Abhinav Chawade added a comment - I created a build of Spark 1.4.1 which incorporates your patch but somehow predicates are still not being propagated. The set of steps I followed 1) Build Spark 1.4.1 with patch incorporated. 2) Replace spark-catalyst jar on all nodes. 3) Run explain on following command in spark-sql. Notice the query plan. spark-sql> explain select t.assetid from tenants t inner join assets on t.assetid = assets.assetid where t.assetid=1201; == Physical Plan == Project [assetid#18] ShuffledHashJoin [assetid#18], [assetid#20], BuildRight Exchange (HashPartitioning 200) Filter (assetid#18 = 1201) HiveTableScan [assetid#18], (MetastoreRelation element22082, tenants, Some(t)), None Exchange (HashPartitioning 200) HiveTableScan [assetid#20], (MetastoreRelation element22082, assets, None), None Time taken: 2.741 seconds, Fetched 8 row(s)
          Hide
          smilegator Xiao Li added a comment -

          Let me try your SQL query in Spark 1.6.1.

          Show
          smilegator Xiao Li added a comment - Let me try your SQL query in Spark 1.6.1.
          Hide
          doodlegum Abhinav Chawade added a comment -

          Here is my branch on github if you'd like to take a look. https://github.com/drnushooz/spark/tree/v1.4.1-SPARK-13219

          Show
          doodlegum Abhinav Chawade added a comment - Here is my branch on github if you'd like to take a look. https://github.com/drnushooz/spark/tree/v1.4.1-SPARK-13219
          Hide
          velvia Evan Chan added a comment -

          Xiao Li does your PR take care of the case where no JOIN clause is invoked?
          does it also take care of multiple join conditions? (e.g., select from a a, b b, c c where a.col1 = b.col1 && b.col1 = c.col1 && .... )

          Show
          velvia Evan Chan added a comment - Xiao Li does your PR take care of the case where no JOIN clause is invoked? does it also take care of multiple join conditions? (e.g., select from a a, b b, c c where a.col1 = b.col1 && b.col1 = c.col1 && .... )
          Hide
          smilegator Xiao Li added a comment - - edited

          It should work without Join clauses, as long as these predicates are pushed into the join/filter conditions.

          However, the current PR is unable to infer the conditions when needing multiple hops. Actually, I am glad to work on it, if needed.

          Let me at Reynold Xin Michael Armbrust.

          Show
          smilegator Xiao Li added a comment - - edited It should work without Join clauses, as long as these predicates are pushed into the join/filter conditions. However, the current PR is unable to infer the conditions when needing multiple hops. Actually, I am glad to work on it, if needed. Let me at Reynold Xin Michael Armbrust .
          Hide
          smilegator Xiao Li added a comment -

          Sorry, there is a bug in the original PR. You just need to change the code based on the fix:

          https://github.com/gatorsmile/spark/commit/20d46c9bee2d99966406e6450b159ca404578aa6

          Let me know if it works now. Thanks!

          Show
          smilegator Xiao Li added a comment - Sorry, there is a bug in the original PR. You just need to change the code based on the fix: https://github.com/gatorsmile/spark/commit/20d46c9bee2d99966406e6450b159ca404578aa6 Let me know if it works now. Thanks!
          Hide
          doodlegum Abhinav Chawade added a comment -

          Xiao Li I changed the patch based on https://github.com/gatorsmile/spark/commit/20d46c9bee2d99966406e6450b159ca404578aa6 and the predicate pushdown is working for 2 and more than 2 tables. I created another table called parkings(assetid int primary key,parkingid int) and joined that with assets and tenants to get result. Looking at explain output, the filter gets pushed down to all phases before table scan which can speed things up.

          spark-sql> explain select p.parkingid from parkings p,assets a,tenants t where t.assetid=a.assetid and a.assetid=p.assetid and t.assetid=1201;
          == Physical Plan ==
          Project [parkingid#22]
           ShuffledHashJoin [assetid#23], [assetid#26], BuildRight
            Exchange (HashPartitioning 200)
             Project [parkingid#22,assetid#23]
              ShuffledHashJoin [assetid#21], [assetid#23], BuildRight
               Exchange (HashPartitioning 200)
                Filter (assetid#21 = 1201)
                 HiveTableScan [assetid#21,parkingid#22], (MetastoreRelation element22082, parkings, Some(p)), None
               Exchange (HashPartitioning 200)
                Filter (assetid#23 = 1201)
                 HiveTableScan [assetid#23], (MetastoreRelation element22082, assets, Some(a)), None
            Exchange (HashPartitioning 200)
             Filter (assetid#26 = 1201)
              HiveTableScan [assetid#26], (MetastoreRelation element22082, tenants, Some(t)), None
          Time taken: 0.43 seconds, Fetched 15 row(s)
          

          I will do some more tests with inner, left outer and right outer join but with a simple select with 3 tables the result looks promising. I have updated code in my branch based on your fix https://github.com/drnushooz/spark/tree/v1.4.1-SPARK-13219

          Show
          doodlegum Abhinav Chawade added a comment - Xiao Li I changed the patch based on https://github.com/gatorsmile/spark/commit/20d46c9bee2d99966406e6450b159ca404578aa6 and the predicate pushdown is working for 2 and more than 2 tables. I created another table called parkings(assetid int primary key,parkingid int) and joined that with assets and tenants to get result. Looking at explain output, the filter gets pushed down to all phases before table scan which can speed things up. spark-sql> explain select p.parkingid from parkings p,assets a,tenants t where t.assetid=a.assetid and a.assetid=p.assetid and t.assetid=1201; == Physical Plan == Project [parkingid#22] ShuffledHashJoin [assetid#23], [assetid#26], BuildRight Exchange (HashPartitioning 200) Project [parkingid#22,assetid#23] ShuffledHashJoin [assetid#21], [assetid#23], BuildRight Exchange (HashPartitioning 200) Filter (assetid#21 = 1201) HiveTableScan [assetid#21,parkingid#22], (MetastoreRelation element22082, parkings, Some(p)), None Exchange (HashPartitioning 200) Filter (assetid#23 = 1201) HiveTableScan [assetid#23], (MetastoreRelation element22082, assets, Some(a)), None Exchange (HashPartitioning 200) Filter (assetid#26 = 1201) HiveTableScan [assetid#26], (MetastoreRelation element22082, tenants, Some(t)), None Time taken: 0.43 seconds, Fetched 15 row(s) I will do some more tests with inner, left outer and right outer join but with a simple select with 3 tables the result looks promising. I have updated code in my branch based on your fix https://github.com/drnushooz/spark/tree/v1.4.1-SPARK-13219
          Hide
          smilegator Xiao Li added a comment -

          Great!

          Show
          smilegator Xiao Li added a comment - Great!
          Hide
          venugopalrp@gmail.com Venu Palvai added a comment -

          We have written custom code to handle this through custom thrift server implementation. We are able to handle multiple tables and multiple columns in the join. We would like to collaborate and provide details of our code.

          Show
          venugopalrp@gmail.com Venu Palvai added a comment - We have written custom code to handle this through custom thrift server implementation. We are able to handle multiple tables and multiple columns in the join. We would like to collaborate and provide details of our code.
          Hide
          velvia Evan Chan added a comment -

          Xiao Li Abhinav Chawade what is the URL to the latest patch? Would like to contribute our code to this.

          Show
          velvia Evan Chan added a comment - Xiao Li Abhinav Chawade what is the URL to the latest patch? Would like to contribute our code to this.
          Hide
          doodlegum Abhinav Chawade added a comment -
          Show
          doodlegum Abhinav Chawade added a comment - Evan Chan You can find the patch https://github.com/apache/spark/pull/10490/files plus https://github.com/gatorsmile/spark/commit/20d46c9bee2d99966406e6450b159ca404578aa6 which fixes a bug and for 1.4 branch on my fork.
          Hide
          smilegator Xiao Li added a comment -

          Thank you! Could you hold it now? I think we can do it in a better way after resolving https://issues.apache.org/jira/browse/SPARK-12957

          Show
          smilegator Xiao Li added a comment - Thank you! Could you hold it now? I think we can do it in a better way after resolving https://issues.apache.org/jira/browse/SPARK-12957
          Hide
          velvia Evan Chan added a comment -

          Sorry, could you explain how SPARK-12957 affects this one?

          Show
          velvia Evan Chan added a comment - Sorry, could you explain how SPARK-12957 affects this one?
          Hide
          velvia Evan Chan added a comment -

          Xiao Li Abhinav Chawade

          Guys, let me explain the strategy that we used to fix the join transitivity, which I believe is much more general and helps many more cases than the approach in PR 10490.

          • First, we find out all of the join columns and all the tables that are joined for each join column.
          • Next, we discover all the predicates (currently equals and IN, could be more) that filter on those join columns.
          • We compute the joined tables for each join column which are missing the predicates
          • We replicate the filter expression (using AND) for each missing table in the previous step.

          The result is that no matter the number of tables and join columns, the predicates are augmented such that = and IN on literals are pushed to all the joined tables.

          Only thing is the current code works on unanalyzed logical plans, so we need to port it to work on analyzed logical plans instead.

          Show
          velvia Evan Chan added a comment - Xiao Li Abhinav Chawade Guys, let me explain the strategy that we used to fix the join transitivity, which I believe is much more general and helps many more cases than the approach in PR 10490. First, we find out all of the join columns and all the tables that are joined for each join column. Next, we discover all the predicates (currently equals and IN, could be more) that filter on those join columns. We compute the joined tables for each join column which are missing the predicates We replicate the filter expression (using AND) for each missing table in the previous step. The result is that no matter the number of tables and join columns, the predicates are augmented such that = and IN on literals are pushed to all the joined tables. Only thing is the current code works on unanalyzed logical plans, so we need to port it to work on analyzed logical plans instead.
          Hide
          venugopalrp@gmail.com Venu Palvai added a comment -

          Hi Evan,
          I'm assuming if the solution is ported to analyzed logical plan it will work for any queries generated via spark shell/ language integrated queries.

          thanks,

          Show
          venugopalrp@gmail.com Venu Palvai added a comment - Hi Evan, I'm assuming if the solution is ported to analyzed logical plan it will work for any queries generated via spark shell/ language integrated queries. thanks,
          Hide
          smilegator Xiao Li added a comment -

          Hi, Evan Chan after a discussion with Michael, he prefers to enhancing the existing Constraints for resolving this issue. Will reimplement the whole thing based on the new framework. Thanks!

          Show
          smilegator Xiao Li added a comment - Hi, Evan Chan after a discussion with Michael, he prefers to enhancing the existing Constraints for resolving this issue. Will reimplement the whole thing based on the new framework. Thanks!
          Hide
          ndimiduk Nick Dimiduk added a comment -

          Now that all the subtasks on SPARK-12957 are resolved, where does that leave this feature? I'm trying to determine if I get this very useful enhancement by upgrading to 2.x. Thanks a lot!

          Show
          ndimiduk Nick Dimiduk added a comment - Now that all the subtasks on SPARK-12957 are resolved, where does that leave this feature? I'm trying to determine if I get this very useful enhancement by upgrading to 2.x. Thanks a lot!
          Hide
          smilegator Xiao Li added a comment -

          Actually, this PR has already been resolved. We can close it now. Thanks!

          Show
          smilegator Xiao Li added a comment - Actually, this PR has already been resolved. We can close it now. Thanks!
          Hide
          ndimiduk Nick Dimiduk added a comment -

          The result is that no matter the number of tables and join columns, the predicates are augmented such that = and IN on literals are pushed to all the joined tables.

          Evan Chan, Xiao Li, the above statement is an accurate reflection of this patch? I've upgraded to 2.1.0 in order to access this optimization. What about the case of a broadcast join – will the values present in the smaller relation be pushed down to the scanner of the larger? I'm not seeing that behavior. Maybe this optimization is out of scope of this patch? I do see IsNotNull pushed down, applied the the columns involved in the join constraint, but I was expecting a but more.

          Thanks again for the patch!

          Show
          ndimiduk Nick Dimiduk added a comment - The result is that no matter the number of tables and join columns, the predicates are augmented such that = and IN on literals are pushed to all the joined tables. Evan Chan , Xiao Li , the above statement is an accurate reflection of this patch? I've upgraded to 2.1.0 in order to access this optimization. What about the case of a broadcast join – will the values present in the smaller relation be pushed down to the scanner of the larger? I'm not seeing that behavior. Maybe this optimization is out of scope of this patch? I do see IsNotNull pushed down, applied the the columns involved in the join constraint, but I was expecting a but more. Thanks again for the patch!
          Hide
          smilegator Xiao Li added a comment -

          If the predicates can be inferred, we definitely can push them down. However, not all the predicates can be pushed down. It depends on the join types. If you hit any scenario, you believe we can further optimize it. Please open a JIRA. Thanks!

          Show
          smilegator Xiao Li added a comment - If the predicates can be inferred, we definitely can push them down. However, not all the predicates can be pushed down. It depends on the join types. If you hit any scenario, you believe we can further optimize it. Please open a JIRA. Thanks!
          Hide
          ndimiduk Nick Dimiduk added a comment -

          I would implement this manually by materializing the smaller relation in the driver and then transforming those values into a filter applied to the larger. Frankly I expected this to be meaning of a broadcast join. I'm wondering if I'm doing something to prevent the planner from performing this optimization, so maybe the mailing list is a more appropriate place to discuss?

          Show
          ndimiduk Nick Dimiduk added a comment - I would implement this manually by materializing the smaller relation in the driver and then transforming those values into a filter applied to the larger. Frankly I expected this to be meaning of a broadcast join. I'm wondering if I'm doing something to prevent the planner from performing this optimization, so maybe the mailing list is a more appropriate place to discuss?
          Hide
          venugopalrp@gmail.com Venu Palvai added a comment -

          Nick,
          We addressed this issue by creating a custom hive context to modify query
          plans generated by Spark SQL. We have implemented in such a way that values
          for = and IN operators get pushed against joining tables. Looking forward,
          it should also work for pushing range scans down to data source.

          In addition to the optimisations mentioned above, we have made several
          other improvements to augment query plans.

          We can share our code and thoughts for Spark community is interested in
          learning about our approach.

          We have implemented a data warehouse/analytics platform using dimensional
          data model approach on Spark+Cassandra by using our customizations.
          thanks,
          Venu Palvai

          On Tue, Feb 14, 2017 at 12:59 PM, Nick Dimiduk (JIRA) <jira@apache.org>

          Show
          venugopalrp@gmail.com Venu Palvai added a comment - Nick, We addressed this issue by creating a custom hive context to modify query plans generated by Spark SQL. We have implemented in such a way that values for = and IN operators get pushed against joining tables. Looking forward, it should also work for pushing range scans down to data source. In addition to the optimisations mentioned above, we have made several other improvements to augment query plans. We can share our code and thoughts for Spark community is interested in learning about our approach. We have implemented a data warehouse/analytics platform using dimensional data model approach on Spark+Cassandra by using our customizations. thanks, Venu Palvai On Tue, Feb 14, 2017 at 12:59 PM, Nick Dimiduk (JIRA) <jira@apache.org>
          Hide
          tanejagagan gagan taneja added a comment -

          Venu
          This is very interesting i would like to look at the code for all the optimization that are in-place
          Do you have plans to contribute is back to spark

          Show
          tanejagagan gagan taneja added a comment - Venu This is very interesting i would like to look at the code for all the optimization that are in-place Do you have plans to contribute is back to spark
          Hide
          tanejagagan gagan taneja added a comment -

          This is what we are looking for
          For example
          Table Address is partitioned based on postal_code
          Table Location which contain location_name and potal_code
          Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 'San Jose'
          If the query is re-written as a in clause the optimizer will be able to prune the partitions which would be significantly faster

          Show
          tanejagagan gagan taneja added a comment - This is what we are looking for For example Table Address is partitioned based on postal_code Table Location which contain location_name and potal_code Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 'San Jose' If the query is re-written as a in clause the optimizer will be able to prune the partitions which would be significantly faster
          Hide
          velvia Evan Chan added a comment -

          Hi Gagan,

          That is an interesting optimization but not the same one that Venu speaks of (I worked on those optimizations). Basically those optimizations are for where the column name in the WHERE clause are present in both tables, and my impression is this is what this fix is for as well.

          Your case would be very useful too. You can do it in two steps though, first do the lookup of postal codes from location, then translate your select from address into an IN condition.

          Of course it’s better if Spark does this so that the results don’t have to be passed back through the driver.

          Show
          velvia Evan Chan added a comment - Hi Gagan, That is an interesting optimization but not the same one that Venu speaks of (I worked on those optimizations). Basically those optimizations are for where the column name in the WHERE clause are present in both tables, and my impression is this is what this fix is for as well. Your case would be very useful too. You can do it in two steps though, first do the lookup of postal codes from location, then translate your select from address into an IN condition. Of course it’s better if Spark does this so that the results don’t have to be passed back through the driver.
          Hide
          azeroth2b Shawn Lavelle added a comment -
          Show
          azeroth2b Shawn Lavelle added a comment - Is https://issues.apache.org/jira/browse/SPARK-19609 working on this issue now?

            People

            • Assignee:
              Unassigned
              Reporter:
              doodlegum Abhinav Chawade
            • Votes:
              7 Vote for this issue
              Watchers:
              18 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development