Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35274

old hive table's all columns are read when column pruning applies in spark3.0

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Question
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.0
    • None
    • SQL
    • spark3.0

    Description

      I asked this question before, but perhaps I did not addressed question clearly, so I did not get answer. This time I will show an example to illustrate this question clearly.

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
      val spark = SparkSession.builder().appName("OrcTest").getOrCreate()
      var inputBytes = 0L
      spark.sparkContext.addSparkListener(new SparkListener() {
        override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { 
          val metrics = taskEnd.taskMetrics
          inputBytes += metrics.inputMetrics.bytesRead 
        } 
      }) 
      spark.sql("create table orc_table_old_schema (_col0 int, _col1 string, _col2 double) STORED AS ORC;")
      spark.sql("insert overwrite table orc_table_old_schema select 1, 'name1', 1000.05")
      inputBytes = 0L
      spark.sql("select _col2 from orc_table_old_schema").show()
      print("input bytes for old schema table: " + inputBytes) // print 1655 
      
      spark.sql("create table orc_table_new_schema (id int, name string, value double) STORED AS ORC;")
      spark.sql("insert overwrite table orc_table_new_schema select 1, 'name1', 1000.05")
      inputBytes = 0L
      spark.sql("select value from orc_table_new_schema").show()
      print("input bytes for new schema table: " + inputBytes) // print 1641
      

      This example is run on spark3.0 with default flags. In this example, I create orc table orc_table_old_schema, which schema has no field name and is written before HIVE-4243, to trigger this issue. You can see that input bytes for table orc_table_old_schema is 14 bytes more than table orc_table_new_schema.

      The reason is that spark3.0 default use native reader rather than hive reader to read orc table, and native reader read all columns for old hive schema table and read only pruning columns (in this example, only column 'value' is read) for new hive schema table.

      The following flags enable native reader: set spark.sql.hive.convertMetastoreOrc=true; set spark.sql.orc.impl=native; both flags value are spark3.0's default value

      Then I dig into spark code and find this:  https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149

      It looks like read all columns for old hive schema (which has no field names) is by design for spark3.0

      In my company data, some table schema is old hive, while some table schema is new hive. The performance of query reading old hive table decreases a lot when I enable native reader in spark3.0. This is main block for us to switch hive reader to native reader in spark3.0. 

      My questions is:

      #1 Do you have plan to support column pruning for old hive schema in native orc reader?

      #2 If question #1's answer is No. Is there some potential issue if code is fixed to support column pruning?

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            expxiaoli xiaoli

            Dates

              Created:
              Updated:

              Slack

                Issue deployment