Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.3.0
-
None
-
None
-
OS: Windows 10
Description
I'm trying to process data that contains timestamps in PySpark "Structured Streaming" using the foreach option. When I run the job I get a OSError: [Errno 22] Invalid argument exception in \pyspark\sql\types.py at the
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
statement.
I have boiled down my Spark job to the essentials:
from pyspark.sql import SparkSession def handle_row(row): print(f'Processing: \{row}') spark = (SparkSession.builder .appName('test.stream.tstmp.byrow') .getOrCreate()) data = (spark.readStream .option('delimiter', ',') .option('header', True) .schema('a integer, b string, c timestamp') .csv('data/test')) query = (data.writeStream .foreach(handle_row) .start()) query.awaitTermination()
In the data/test folder I have one csv file:
a,b,c 1,x,1970-01-01 00:59:59.999 2,y,1999-12-31 23:59:59.999 3,z,2022-10-18 15:53:12.345
If I change the csv schema to 'a integer, b string, c string' everything works fine and I get the expected output of
Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999') Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999') Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
Also, if I change the stream handling to micro-batches like so:
... def handle_batch(df, epoch_id): print(f'Processing: \{df} - Epoch: \{epoch_id}') ... query = (data.writeStream .foreachBatch(handle_batch) .start())
I get the expected output of
Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
But "by row" handling should work with the row having the correct column data type of timestamp.
This issue also affects using the foreach sink in Structured Streaming with the Kafka data souce since Kafka events contain a timestamp