Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-43797 Python User-defined Table Functions
  3. SPARK-48566

[Bug] Partition indices are incorrect when UDTF analyze() uses both select and partitionColumns

    XMLWordPrintableJSON

Details

    Description

      There is a bug that results in an internal error with some combination of the Python UDTF "select" and "partitionBy" options of the "analyze" method.

      To reproduce:

      from pyspark.sql.functions import (
          AnalyzeArgument,
          AnalyzeResult,
          PartitioningColumn,
          SelectedColumn,
          udtf
      )
      
      from pyspark.sql.types import (
          DoubleType,
          StringType,
          StructType,
      )
      
      @udtf
      class TestTvf:
          @staticmethod
          def analyze(observed: AnalyzeArgument) -> AnalyzeResult:
              out_schema = StructType()
              out_schema.add("partition_col", StringType())
              out_schema.add("double_col", DoubleType())
      
              return AnalyzeResult(
                  schema=out_schema,
                  partitionBy=[PartitioningColumn("partition_col")],
                  select=[
                      SelectedColumn("partition_col"),
                      SelectedColumn("double_col"),
                  ],
              )
      
          def eval(self, *args, **kwargs):
              pass
      
          def terminate(self):
              for _ in range(10):
                  yield {
                      "partition_col": None,
                      "double_col": 1.0,
                  }
      
      
      spark.udtf.register("serialize_test", TestTvf) 
      
      # Fails
      (
          spark
          .sql(
              """
              SELECT * FROM serialize_test(
                  TABLE(
                      SELECT
                          5 AS unused_col,
                          'hi' AS partition_col,
                          1.0 AS double_col
                      
                      UNION ALL
      
                      SELECT
                          4 AS unused_col,
                          'hi' AS partition_col,
                          1.0 AS double_col
                  )
              )
              """
          )
          .toPandas()
      )

      Attachments

        Issue Links

          Activity

            People

              dtenedor Daniel
              dtenedor Daniel
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: