Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.4.0
Description
Python Timedelta to PySpark DayTimeIntervalType bug
There is a bug that exists which means certain Python datetime.timedelta objects get converted to a PySpark DayTimeIntervalType column with a different value to that which is stored in the Python timedelta.
A simple illustrative example can be produced with the below code:
from datetime import timedelta from pyspark.sql.types import DayTimeIntervalType, StructField, StructType spark = ...spark session setup here... td = timedelta(days=4498031, seconds=16054, microseconds=999981) df = spark.createDataFrame([(td,)], StructType([StructField(name="timedelta_col", dataType=DayTimeIntervalType(), nullable=False)])) df.show(truncate=False) > +------------------------------------------------+ > |timedelta_col | > +------------------------------------------------+ > |INTERVAL '4498031 04:27:35.999981' DAY TO SECOND| > +------------------------------------------------+ print(str(td)) > '4498031 days, 4:27:34.999981'
In the above example, look at the seconds. The original python timedelta object has 34 seconds, the pyspark DayTimeIntervalType column has 35 seconds.
Fix
This issue arises because the current conversion from python timedelta uses the timedelta function `.total_seconds()` to get the number of seconds, and then adds the microsecond component back in afterwards. Unfortunately the `.total_seconds()` function with some timedeltas (ones with microsecond entries close to 1_000_000 I believe) ends up rounding up to the nearest second (probably due to floating point precision), with the microseconds then added on top of that. The effect is that 1 second gets added incorrectly.
The issue can be fixed by doing the processing in a slightly different way. Instead of doing:
(math.floor(dt.total_seconds()) * 1000000) + dt.microseconds
Instead we construct the timedelta from its components:
(((dt.days * 86400) + dt.seconds) * 1_000_000) + dt.microseconds
Tests
An illustrative edge case example for timedeltas is the above (which can also be written as `datetime.timedelta(microseconds=388629894454999981)`)
A related edge case which is already handled but not tested exists for the situation where there are positive and negative components to the created timedelta object. An entry for this edge case is also included as it is related.
PR
Link to the PR addressing this issue: https://github.com/apache/spark/pull/42541
Keywords to help people searching for this issue:
datetime.timedelta
timedelta
pyspark.sql.types.DayTimeIntervalType
DayTimeIntervalType