Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38288

Aggregate push down doesnt work using Spark SQL jdbc datasource with postgresql

    XMLWordPrintableJSON

Details

    • Question
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.2.1
    • None
    • SQL

    Description

      I am establishing a connection with postgresql using the Spark SQL jdbc datasource. I have started the spark shell including the postgres driver and I can connect and execute queries without problems. I am using this statement:

      val df = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:port/").option("driver", "org.postgresql.Driver").option("dbtable", "test").option("user", "postgres").option("password", "*******").option("pushDownAggregate",true).load()
      

      I am adding the pushDownAggregate option because I would like the aggregations are delegated to the source. But for some reason this is not happening.

      Reviewing this pull request, it seems that this feature should be merged into 3.2. https://github.com/apache/spark/pull/29695

      I am making the aggregations considering the mentioned limitations. An example case where I don't see pushdown being done would be this one:

      df.groupBy("name").max("age").show()
      

      The results of the queryExecution are shown below:

      scala> df.groupBy("name").max("age").queryExecution.executedPlan
      res19: org.apache.spark.sql.execution.SparkPlan =
      AdaptiveSparkPlan isFinalPlan=false
      +- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, max(age)#544])
         +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
            +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], output=[name#274, max#548])
               +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<age:int,name:string>
      
      scala> dfp.groupBy("name").max("age").queryExecution.toString
      res20: String =
      "== Parsed Logical Plan ==
      Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
      +- Relation [age#246] JDBCRelation(test) [numPartitions=1]
      
      == Analyzed Logical Plan ==
      name: string, max(age): int
      Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
      +- Relation [age#24...
      

      What could be the problem? Should pushDownAggregate work in this case?

      Attachments

        Activity

          People

            Unassigned Unassigned
            llozano Luis Lozano Coira
            Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: