Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40622

Result of a single task in collect() must fit in 2GB

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.4.0
    • Spark Core, SQL
    • None

    Description

      when collecting results, data from single partition/task is serialized through byte array or ByteBuffer(which is backed by byte array as well), therefore it's subject to java array max size limit(in terms of byte array, it's 2GB).

       

      Construct a single partition larger than 2GB and collect it can easily reproduce the issue

      // create data of size ~3GB in single partition, which exceeds the byte array limit
      // random gen to make sure it's poorly compressed
      val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 1000000) as data")
      
      withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
        withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
          df.queryExecution.executedPlan.executeCollect()
        }
      } 

       will get a OOM error from https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125

       

      Consider using ChunkedByteBuffer to replace byte array in order to bypassing this limit

      Attachments

        Activity

          People

            liuzq12 Ziqi Liu
            liuzq12 Ziqi Liu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: