Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-50203

Data ingestion into the Iceberg table (S3 bucket) via a Spark batch job is failing due to an Out of Memory.

    XMLWordPrintableJSON

Details

    • Question
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.3
    • 4.0.0
    • Spark Core
    • None
    • Running on MacOS + 48GB Memory + 16Cores + M3

    Description

      While attempting to ingest data into an Iceberg table on S3 using a Spark batch job, the process fails with an OOM error. Initial investigation suggests that the use of bucketing as a partitioning strategy may be the cause. When bucketing is removed, the code runs successfully.

       

      Here is the current Spark code being used:

       

      import org.apache.spark.sql.SparkSession
      import org.slf4j.{Logger, LoggerFactory}
      import java.sql.Date
      import java.time.LocalDate
      import scala.util.Random
      object IcebergDataGenerator {
        def main(args: Array[String]): Unit = {
          val logger: Logger = LoggerFactory.getLogger(this.getClass)
          val spark =
            SparkSession
              .builder()
              .appName("Iceberg Data Generator")
              .master("local[*]")
              .config("spark.driver.memory", "16g")
              .config("spark.executor.memory", "16g")
              .config("spark.driver.maxResultSize", "2g")
              .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
              .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
              .config("spark.sql.catalog.rest.type", "rest")
              .config("spark.sql.catalog.rest.uri", "http://127.0.0.1:9001/iceberg/")
              .config("spark.sql.adaptive.enabled", "true")
              .config("spark.sql.shuffle.partitions", "1000")
              .config("spark.default.parallelism", "32")
              .getOrCreate()
          import spark.implicits._
          spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;")
          spark.sql(
            """
              |CREATE TABLE IF NOT EXISTS rest.db.customers2 (
              |  customer_id INT,
              |  customer_name STRING,
              |  date DATE,
              |  transaction_details STRING
              |) USING iceberg
              |PARTITIONED BY (bucket(1000, customer_id), days(date))
        """.stripMargin)
          // generate the data 
          def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date, String)] = {
            val random = new Random()
            numbers.map { i =>
              val customerId = i
              val customerName = s"Customer_$i"
              val date = Date.valueOf(LocalDate.now().minusDays(random.nextInt(180))) // Random date within the last 6 months
              val transactionDetails = s"Transaction details for customer $i"
              (customerId, customerName, date, transactionDetails)
            }
          }
          // Generate  1,00,000 users 
          val customerData = generateCustomerData(1 to 100000)
          // Convert to DataFrame
          val customerDF = customerData.toDF("customer_id", "customer_name", "date", "transaction_details")
          // Write the data to an Apache Iceberg table
          logger.info(s"partition count:  ${customerDF.rdd.getNumPartitions}")
          customerDF
            .write
            .format("iceberg")
            .mode("append")
            .save("rest.db.customers2")
          val df = spark.sql("SELECT * FROM rest.db.customers2;")
          logger.info("Count:   " + df.count())
          // Stop Spark session
          spark.stop()
        }
      }
      
      
      
      Here is ERROR:
       
      [error] Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 1 times, most recent failure: Lost task 4.0 in stage 4.0 (TID 506) (192.168.29.234 executor driver): java.lang.OutOfMemoryError: Java heap space
      [error]     at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79)
      [error]     at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:158)
      [error]     at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
      [error]     at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
      [error]     at org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
      [error]     at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360)
      [error]     at org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:760)
      [error]     at org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131)
      [error]     at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
      [error]     at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32)
      [error]     at org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:108)
      [error]     at org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47)
      [error]     at org.apache.iceberg.io.FanoutDataWriter.newWriter(FanoutDataWriter.java:53)
      [error]     at org.apache.iceberg.io.FanoutWriter.writer(FanoutWriter.java:63)
      [error]     at org.apache.iceberg.io.FanoutWriter.write(FanoutWriter.java:51)
      [error]     at org.apache.iceberg.io.FanoutDataWriter.write(FanoutDataWriter.java:31)
      [error]     at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:781)
      [error]     at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:751)
      [error]     at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:498)
      [error]     at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:453)
      [error]     at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$$Lambda$3795/0x00000008017a7440.apply(Unknown Source)
      [error]     at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
      [error]     at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491)
      [error]     at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
      [error]     at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$3504/0x00000008016f2040.apply(Unknown Source)
      [error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
      [error]     at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
      [error]     at org.apache.spark.scheduler.Task.run(Task.scala:141)
      [error]     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
      [error]     at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2232/0x0000000800fa6040.apply(Unknown Source)
      [error] Driver stacktrace:
      [error]     at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
      [error]     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
      [error]     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
      [error]     at scala.collection.immutable.List.foreach(List.scala:334)
      [error]     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
      [error]     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
      [error]     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
      [error]     at scala.Option.foreach(Option.scala:437)
      [error]     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
      [error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
      [error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
      [error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
      [error]     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
      [error]     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
      [error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:390)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:364)
      [error]     at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:230)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:342)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:341)
      [error]     at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:230)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
      [error]     at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
      [error]     at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
      [error]     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
      [error]     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
      [error]     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
      [error]     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
      [error]     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
      [error]     at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
      [error]     at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
      [error]     at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
      [error]     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
      [error]     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
      [error]     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
      [error]     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
      [error]     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
      [error]     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
      [error]     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
      [error]     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
      [error]     at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
      [error]     at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
      [error]     at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
      [error]     at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
      [error]     at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
      [error]     at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:315)
      [error]     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
      [error]     at com.techmonad.spark.IcebergDataGenerator$.main(IcebergDataGenerator.scala:77)
      [error]     at com.techmonad.spark.IcebergDataGenerator.main(IcebergDataGenerator.scala) 

       

       
       

      Questions:

       

      1. Can bucketing be effectively used with Iceberg tables in Spark?

      2. What could be causing the OOM issue, and are there potential workarounds?

       

      Let me know if you'd like any additional details added!

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            satendrakumar06 Satendra Kumar
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: