Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24859

Predicates pushdown on outer joins

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 2.2.0
    • None
    • Spark Core, SQL
    • None
    • Cloudera CDH 5.13.1

    Description

      I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a common column called part_col. Now I want to join both tables on their id but only for some of partitions.

      If I use an inner join, everything works well:

       

      select *
      from FA f
      join DI d
      on(f.id = d.id and f.part_col = d.part_col)
      where f.part_col = 'xyz'
      

       

      In the sql explain plan I can see, that the predicate part_col = 'xyz' is also used in the DIm HiveTableScan.

       

      When I execute the same query using a left join the full dim table is scanned. There are some workarounds for this issue, but I wanted to report this as a bug, since it works on an inner join, and i think the behaviour should be the same for an outer join.

      Here is a self contained example (created in Zeppelin):

       

      val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col")
      val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col")
      fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact")
      dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim")
       
      spark.sqlContext.sql("create table if not exists fact(id int) partitioned by (part_col int) stored as avro location '/tmp/jira/fact'")
      spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create table if not exists dim(id int) partitioned by (part_col int) stored as avro location '/tmp/jira/dim'")
      spark.sqlContext.sql("msck repair table dim")

       
       
       
      Inner join example:

      select * from fact f
      join dim d
      on (f.id = d.id
      and f.part_col = d.part_col)
      where f.part_col = 100

      Excerpt from Spark-SQL physical explain plan: 

      HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], [isnotnull(part_col#412), (part_col#412 = 100)] 
      HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], [isnotnull(part_col#414), (part_col#414 = 100)]

       
      Outer join example:

      select * from fact f
      left join dim d
      on (f.id = d.id
      and f.part_col = d.part_col)
      where f.part_col = 100

       
      Excerpt from Spark-SQL physical explain plan:
       

      HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], [isnotnull(part_col#427), (part_col#427 = 100)]   
      HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] 

       
       

      As you can see the predicate is not pushed down to the HiveTableScan of the dim table on the outer join.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            joha0123 Johannes Mayer
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: