Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5049

ParquetTableScan always prepends the values of partition columns in output rows irrespective of the order of the partition columns in the original SELECT query

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.1.0, 1.2.0
    • 1.2.1, 1.3.0
    • SQL
    • None

    Description

      This happens when ParquetTableScan is being used by turning on spark.sql.hive.convertMetastoreParquet

      For example:

      spark-sql> set spark.sql.hive.convertMetastoreParquet=true;

      spark-sql> create table table1(a int , b int) partitioned by (p1 string, p2 int) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat';

      spark-sql> insert into table table1 partition(p1='January',p2=1) select key, 10 from src;

      spark-sql> select a, b, p1, p2 from table1 limit 10;

      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10
      January 1 484 10

      The correct output should be

      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1
      484 10 January 1

      This also leads to schema mismatch if the query is run using HiveContext and the result is a SchemaRDD.
      For example :

      scala> import org.apache.spark.sql.hive._
      scala> val hc = new HiveContext(sc)
      scala> hc.setConf("spark.sql.hive.convertMetastoreParquet", "true")
      scala> val res = hc.sql("select a, b, p1, p2 from table1 limit 10")
      scala> res.collect
      res2: Array[org.apache.spark.sql.Row] = Array([January,1,238,10], [January,1,86,10], [January,1,311,10], [January,1,27,10], [January,1,165,10], [January,1,409,10], [January,1,255,10], [January,1,278,10], [January,1,98,10], [January,1,484,10])

      scala> res.schema
      res5: org.apache.spark.sql.StructType = StructType(ArrayBuffer(StructField(a,IntegerType,true), StructField(b,IntegerType,true), StructField(p1,StringType,true), StructField(p2,IntegerType,true)))

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            rahul.aggarwal Rahul Aggarwal
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment