Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40952

Exception when handling timestamp data in PySpark Structured Streaming

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.3.0
    • None
    • None
    • OS: Windows 10 

    Description

      I'm trying to process data that contains timestamps in PySpark "Structured Streaming" using the foreach option. When I run the job I get a OSError: [Errno 22] Invalid argument exception in \pyspark\sql\types.py at the

      return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
      

      statement.

      I have boiled down my Spark job to the essentials:

      from pyspark.sql import SparkSession
      
      def handle_row(row):
        print(f'Processing: \{row}')
      
      spark = (SparkSession.builder
        .appName('test.stream.tstmp.byrow')
        .getOrCreate())
      
      data = (spark.readStream
        .option('delimiter', ',')
        .option('header', True)
        .schema('a integer, b string, c timestamp')
        .csv('data/test'))
      
      query = (data.writeStream
        .foreach(handle_row)
        .start())
      
      query.awaitTermination()
      

      In the data/test folder I have one csv file:

      a,b,c
      1,x,1970-01-01 00:59:59.999
      2,y,1999-12-31 23:59:59.999
      3,z,2022-10-18 15:53:12.345
      

      If I change the csv schema to 'a integer, b string, c string' everything works fine and I get the expected output of

      Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999')
      Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999')
      Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
      

      Also, if I change the stream handling to micro-batches like so:

      ...
      def handle_batch(df, epoch_id):
        print(f'Processing: \{df} - Epoch: \{epoch_id}')
      ...
      query = (data.writeStream
        .foreachBatch(handle_batch)
        .start())
      

      I get the expected output of

      Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
      

      But "by row" handling should work with the row having the correct column data type of timestamp.

      This issue also affects using the foreach sink in Structured Streaming with the Kafka data souce since Kafka events contain a timestamp

      Attachments

        Activity

          People

            Unassigned Unassigned
            KaiRoesner Kai-Michael Roesner
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: