Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28502

Error with struct conversion while using pandas_udf

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 2.4.3
    • Fix Version/s: None
    • Component/s: PySpark
    • Labels:
      None
    • Environment:

      OS: Ubuntu

      Python: 3.6

      Description

      What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I don't know if there is a better/cleaner way to do it.

      Below is the sample code that I tried and error message I am getting.

       

      df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
                                  (13.00, "2018-03-11T12:27:18+00:00"),
                                  (25.00, "2018-03-12T11:27:18+00:00"),
                                  (20.00, "2018-03-13T15:27:18+00:00"),
                                  (17.00, "2018-03-14T12:27:18+00:00"),
                                  (99.00, "2018-03-15T11:27:18+00:00"),
                                  (156.00, "2018-03-22T11:27:18+00:00"),
                                  (17.00, "2018-03-31T11:27:18+00:00"),
                                  (25.00, "2018-03-15T11:27:18+00:00"),
                                  (25.00, "2018-03-16T11:27:18+00:00")
                                  ],
                                 ["id", "ts"])
      df = df.withColumn('ts', df.ts.cast('timestamp'))
      
      schema = StructType([
          StructField("id", IntegerType()),
          StructField("ts", TimestampType())
      ])
      
      
      @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
      def some_udf(df):
          # some computation
          return df
      
      df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
      

      This throws following exception:

      TypeError: Unsupported type in conversion from Arrow: struct<start: timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
      

       

      However, if I use builtin agg method then it works all fine. For example,

      df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
      

      Output

      +-----+------------------------------------------+-------+
      |id   |window                                    |avg(id)|
      +-----+------------------------------------------+-------+
      |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
      |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
      |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
      |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
      |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
      |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
      |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
      +-----+------------------------------------------+-------+
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              nasirali Nasir Ali
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated: