Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25474

Update the docs for spark.sql.statistics.fallBackToHdfs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.1, 2.4.3
    • 3.0.0
    • SQL
    • None
    • Spark 2.3.1
      Hadoop 2.7.2

    Description

      Description : Size in bytes of the query is coming in EB in case of parquet datasource. this would impact the performance , since join queries would always go as Sort Merge Join.

      Precondition : spark.sql.statistics.fallBackToHdfs = true

      Steps:

      0: jdbc:hive2://10.xx:23040/default> create table t1110 (a int, b string) using parquet PARTITIONED BY (b) ;
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      
      0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (2,'b');
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (1,'a');
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.xx.xx:23040/default> select * from t1110;
      +----+----+--+
      | a | b |
      +----+----+--+
      | 1 | a |
      | 2 | b |
      +----+----+--+
      
      

      Cost of the query shows sizeInBytes in EB

       explain cost select * from t1110;
      
      
      
      | == Optimized Logical Plan ==
      Relation[a#23,b#24] parquet, Statistics(sizeInBytes=8.0 EB, hints=none)
      
      == Physical Plan ==
      *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> |
      

      This would lead to Sort Merge Join in case of join query

      0: jdbc:hive2://10.xx.xx:23040/default> create table t110 (a int, b string) using parquet PARTITIONED BY (b) ;
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      
      0: jdbc:hive2://10.xx.xx:23040/default> insert into t110 values (1,'a');
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      
       explain select * from t1110 t1 join t110 t2 on t1.a=t2.a;
      
      | == Physical Plan ==
      *(5) SortMergeJoin [a#23], [a#55], Inner
      :- *(2) Sort [a#23 ASC NULLS FIRST], false, 0
      : +- Exchange hashpartitioning(a#23, 200)
      : +- *(1) Project [a#23, b#24]
      : +- *(1) Filter isnotnull(a#23)
      : +- *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int>
      +- *(4) Sort [a#55 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#55, 200)
      +- *(3) Project [a#55, b#56]
      +- *(3) Filter isnotnull(a#55)
      +- *(3) FileScan parquet open.t110[a#55,b#56] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t110], PartitionCount: 1, PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int> |
      
      
      

      Attachments

        Activity

          People

            yumwang Yuming Wang
            Ayush007 Ayush Anubhava
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: