Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • PySpark

    Description

      Currently DataFrame.mapPatitions is analogous to DataFrame.rdd.mapPatitions in both Spark and pySpark. The function that is applied to each partition f must operate on a list generator. This is however very inefficient in Python. It would be more logical and efficient if the apply function f operated on Pandas DataFrames instead and also returned a DataFrame. This avoids unnecessary iteration in Python which is slow.

      Currently:

      def apply_function(rows):
          df = pd.DataFrame(list(rows))
          df = df % 100   # Do something on df
          return df.values.tolist()
      
      table = sqlContext.read.parquet("")
      table = table.mapPatitions(apply_function)
      

      New apply function would accept a Pandas DataFrame and return a DataFrame:

      def apply_function(df):
          df = df % 100   # Do something on df
          return df
      

      Attachments

        Activity

          cml Chris Lambertus made changes -
          Workflow Default workflow, editable Closed status [ 13133694 ] no-reopen-closed [ 13379481 ]
          holden Holden Karau made changes -
          Resolution Won't Fix [ 2 ]
          Status Open [ 1 ] Closed [ 6 ]
          holden Holden Karau added a comment -

          In some ways yes avoiding unecessary iteration can be good, but allowing Spark to spill is also important. That being said - map and map partitions have been temporarily removed from DataFrame while the dataset API is sorted out in Python so I don't think this is likely to get in (although maybe worth being involved in the new dataframe API discussions if you are interested.

          holden Holden Karau added a comment - In some ways yes avoiding unecessary iteration can be good, but allowing Spark to spill is also important. That being said - map and map partitions have been temporarily removed from DataFrame while the dataset API is sorted out in Python so I don't think this is likely to get in (although maybe worth being involved in the new dataframe API discussions if you are interested.
          joshlk Josh made changes -
          Labels dataframe pandas dataframe mapPartitions pandas
          joshlk Josh made changes -
          Field Original Value New Value
          Description Currently DataFrame.mapPatitions is analogous to DataFrame.rdd.mapPatitions in both Spark and pySpark. The function that is applied to each partition _f_ must operate on a list generator. This is however very inefficient in Python. It would be more logical and efficient if the apply function _f_ operated on Pandas DataFrames instead and also returned a DataFrame. This avoids unnecessary iteration in Python which is slow.

          Currently:
          {code:python}
          def apply_function(rows):
              df = pd.DataFrame(list(rows))
              df = df % 100 # Do something on df
              return df.values.tolist()

          table = sqlContext.read.parquet("")
          table = table.mapPatitions(apply_function)
          {code}

          New apply function would accept a Pandas DataFrame and return a DataFrame:
          {code:python}
          def apply_function(df):
              df = df % 100 # Do something on df
              return df
          {code}
          Currently DataFrame.mapPatitions is analogous to DataFrame.rdd.mapPatitions in both Spark and pySpark. The function that is applied to each partition _f_ must operate on a list generator. This is however very inefficient in Python. It would be more logical and efficient if the apply function _f_ operated on Pandas DataFrames instead and also returned a DataFrame. This avoids unnecessary iteration in Python which is slow.

          Currently:
          {code}
          def apply_function(rows):
              df = pd.DataFrame(list(rows))
              df = df % 100 # Do something on df
              return df.values.tolist()

          table = sqlContext.read.parquet("")
          table = table.mapPatitions(apply_function)
          {code}

          New apply function would accept a Pandas DataFrame and return a DataFrame:
          {code}
          def apply_function(df):
              df = df % 100 # Do something on df
              return df
          {code}
          joshlk Josh created issue -

          People

            Unassigned Unassigned
            joshlk Josh
            Votes:
            0 Vote for this issue
            Watchers:
            Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack