Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-10957

Expanding pyarrow buffer size more than 2GB for pandas_udf functions

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • None
    • C++, Java, Python

    Description

      There is 2GB limit for data that can be passed to any pandas_udf function and the aim of this issue is to expand this limit. It's very small buffer size if we use pyspark and our goal is fitting machine learning models.

      Steps to reproduce - just use following spark-submit for executing following after python function.

      %sh
      cd /home/zeppelin/code && \
      export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \
      export PYSPARK_PYTHON=./env3/bin/python && \
      export ARROW_PRE_0_15_IPC_FORMAT=1 && \
      spark-submit \
      --master yarn \
      --deploy-mode client \
      --num-executors 5 \
      --executor-cores 5 \
      --driver-memory 8G \
      --executor-memory 8G \
      --conf spark.executor.memoryOverhead=4G \
      --conf spark.driver.memoryOverhead=4G \
      --archives /home/zeppelin/env3.tar.gz#env3 \
      --jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \
      --py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \
      --job temp
      

       

      Bar.Python
      import pyspark
      from pyspark.sql import functions as F, types as T
      import pandas as pd
      
      def analyze(spark):
      
          pdf1 = pd.DataFrame(
              [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
              columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
          )
          df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')
      
          pdf2 = pd.DataFrame(
              [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
              columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
          )
          df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
          df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
      
          def myudf(df):
              import os
              os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
              return df
      
          df4 = df3 \
              .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
              .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
              .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
              .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
              .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
              .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
              .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
              .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
              .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
              .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
          print(df4.printSchema())
      
          udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
      
          df5 = df4.groupBy('df1_c1').apply(udf)
          print('df5.count()', df5.count())
      

      If you need more details please let me know.

      Attachments

        1. python env.png
          5 kB
          Dmitry Kravchuk
        2. spark3 error.png
          85 kB
          Dmitry Kravchuk
        3. spark3 env.png
          81 kB
          Dmitry Kravchuk

        Activity

          People

            Unassigned Unassigned
            dishka_krauch Dmitry Kravchuk
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 672h
                672h
                Remaining:
                Remaining Estimate - 672h
                672h
                Logged:
                Time Spent - Not Specified
                Not Specified