Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.4.0
-
None
-
Spark version 2.4.0
Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_144
The same issue is observed when PySpark is run on both macOS 10.13.6 and CentOS 7, so this appears to be a cross-platform issue.
Description
Description of issue
Whenever a field in a PySpark Row requires serialization (such as a DateType or TimestampType), the DataFrame generated by the code below will assign column values in alphabetical order, rather than assigning each column value to its specified columns.
Code to reproduce:
import datetime from pyspark.sql import Row from pyspark.sql.session import SparkSession from pyspark.sql.types import DateType, StringType, StructField, StructType spark = SparkSession.builder.getOrCreate() schema = StructType([ StructField("date_column", DateType()), StructField("my_b_column", StringType()), StructField("my_a_column", StringType()), ]) spark.createDataFrame([Row( date_column=datetime.date.today(), my_b_column="my_b_value", my_a_column="my_a_value" )], schema).show()
Expected result:
+-----------+-----------+-----------+ |date_column|my_b_column|my_a_column| +-----------+-----------+-----------+ | 2018-11-28| my_b_value| my_a_value| +-----------+-----------+-----------+
Actual result:
+-----------+-----------+-----------+ |date_column|my_b_column|my_a_column| +-----------+-----------+-----------+ | 2018-11-28| my_a_value| my_b_value| +-----------+-----------+-----------+
(Note that my_a_value and my_b_value are transposed.)
Analysis of issue
Reviewing the relevant code on GitHub, there are two relevant conditional blocks:
if self._needSerializeAnyField: # Block 1, does not work correctly else: # Block 2, works correctly
Row is implemented as both a tuple of alphabetically-sorted columns, and a dictionary of named columns. In Block 2, there is a conditional that works specifically to serialize a Row object:
elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): return tuple(obj[n] for n in self.names)
There is no such condition in Block 1, so we fall into this instead:
elif isinstance(obj, (tuple, list)): return tuple(f.toInternal(v) if c else v for f, v, c in zip(self.fields, obj, self._needConversion))
The behaviour in the zip call is wrong, since obj (the Row) will return a different ordering than the schema fields. So we end up with:
(date, date, True), (b, a, False), (a, b, False)
Workarounds
Correct behaviour is observed if you use a Python list or dict instead of PySpark's Row object:
# Using a list works spark.createDataFrame([[ datetime.date.today(), "my_b_value", "my_a_value" ]], schema) # Using a dict also works spark.createDataFrame([{ "date_column": datetime.date.today(), "my_b_column": "my_b_value", "my_a_column": "my_a_value" }], schema)
Correct behaviour is also observed if you have no fields that require serialization; in this example, changing date_column to StringType avoids the correctness issue.
Attachments
Issue Links
- duplicates
-
SPARK-24915 Calling SparkSession.createDataFrame with schema can throw exception
- Resolved