Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26412

Allow Pandas UDF to take an iterator of pd.DataFrames

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • PySpark
    • None

    Description

      Pandas UDF is the ideal connection between PySpark and DL model inference workload. However, user needs to load the model file first to make predictions. It is common to see models of size ~100MB or bigger. If the Pandas UDF execution is limited to each batch, user needs to repeatedly load the same model for every batch in the same python worker process, which is inefficient.

      We can provide users the iterator of batches in pd.DataFrame and let user code handle it:

      @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER)
      def predict(batch_iter):
        model = ... # load model
        for batch in batch_iter:
          yield model.predict(batch)
      

      The type of each batch is:

      • a pd.Series if UDF is called with a single non-struct-type column
      • a tuple of pd.Series if UDF is called with more than one Spark DF columns
      • a pd.DataFrame if UDF is called with a single StructType column

      Examples:

      @pandas_udf(...)
      def evaluate(batch_iter):
        model = ... # load model
        for features, label in batch_iter:
          pred = model.predict(features)
          yield (pred - label).abs()
      
      df.select(evaluate(col("features"), col("label")).alias("err"))
      
      @pandas_udf(...)
      def evaluate(pdf_iter):
        model = ... # load model
        for pdf in pdf_iter:
          pred = model.predict(pdf['x'])
          yield (pred - pdf['y']).abs()
      
      df.select(evaluate(struct(col("features"), col("label"))).alias("err"))
      

      If the UDF doesn't return the same number of records for the entire partition, user should see an error. We don't restrict that every yield should match the input batch size.

      Another benefit is with iterator interface and asyncio from Python, it is flexible for users to implement data pipelining.

      cc: Li Jin Bryan Cutler [~holdenk] [~hyukjin.kwon] Takuya Ueshin Xiao Li

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            weichenxu123 Weichen Xu Assign to me
            mengxr Xiangrui Meng
            Votes:
            0 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment