Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26200

Column values are incorrectly transposed when a field in a PySpark Row requires serialization

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.4.0
    • None
    • PySpark
    • Spark version 2.4.0

      Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_144

      The same issue is observed when PySpark is run on both macOS 10.13.6 and CentOS 7, so this appears to be a cross-platform issue.

    Description

      Description of issue

      Whenever a field in a PySpark Row requires serialization (such as a DateType or TimestampType), the DataFrame generated by the code below will assign column values in alphabetical order, rather than assigning each column value to its specified columns.

      Code to reproduce:

      import datetime
      from pyspark.sql import Row
      from pyspark.sql.session import SparkSession
      from pyspark.sql.types import DateType, StringType, StructField, StructType
      
      
      spark = SparkSession.builder.getOrCreate()
      schema = StructType([
          StructField("date_column", DateType()),
          StructField("my_b_column", StringType()),
          StructField("my_a_column", StringType()),
      ])
      
      spark.createDataFrame([Row(
          date_column=datetime.date.today(),
          my_b_column="my_b_value",
          my_a_column="my_a_value"
      )], schema).show()
      

      Expected result:

      +-----------+-----------+-----------+
      |date_column|my_b_column|my_a_column|
      +-----------+-----------+-----------+
      | 2018-11-28| my_b_value| my_a_value|
      +-----------+-----------+-----------+

      Actual result:

      +-----------+-----------+-----------+
      |date_column|my_b_column|my_a_column|
      +-----------+-----------+-----------+
      | 2018-11-28| my_a_value| my_b_value|
      +-----------+-----------+-----------+

      (Note that my_a_value and my_b_value are transposed.)

      Analysis of issue

      Reviewing the relevant code on GitHub, there are two relevant conditional blocks:

       

      if self._needSerializeAnyField:
          # Block 1, does not work correctly
      else:
          # Block 2, works correctly
      

      Row is implemented as both a tuple of alphabetically-sorted columns, and a dictionary of named columns. In Block 2, there is a conditional that works specifically to serialize a Row object:

       

      elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
          return tuple(obj[n] for n in self.names)
      

      There is no such condition in Block 1, so we fall into this instead:

       

      elif isinstance(obj, (tuple, list)):
          return tuple(f.toInternal(v) if c else v
              for f, v, c in zip(self.fields, obj, self._needConversion))
      

      The behaviour in the zip call is wrong, since obj (the Row) will return a different ordering than the schema fields. So we end up with:

      (date, date, True),
      (b, a, False),
      (a, b, False)
      

      Workarounds

      Correct behaviour is observed if you use a Python list or dict instead of PySpark's Row object:

       

      # Using a list works
      spark.createDataFrame([[
          datetime.date.today(),
          "my_b_value",
          "my_a_value"
      ]], schema)
      
      # Using a dict also works
      spark.createDataFrame([{
          "date_column": datetime.date.today(),
          "my_b_column": "my_b_value",
          "my_a_column": "my_a_value"
      }], schema)

      Correct behaviour is also observed if you have no fields that require serialization; in this example, changing date_column to StringType avoids the correctness issue.

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              davidlyness David Lyness
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: