Details
-
Question
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.5.3
-
None
-
Running on MacOS + 48GB Memory + 16Cores + M3
Description
While attempting to ingest data into an Iceberg table on S3 using a Spark batch job, the process fails with an OOM error. Initial investigation suggests that the use of bucketing as a partitioning strategy may be the cause. When bucketing is removed, the code runs successfully.
Here is the current Spark code being used:
import org.apache.spark.sql.SparkSession import org.slf4j.{Logger, LoggerFactory} import java.sql.Date import java.time.LocalDate import scala.util.Random object IcebergDataGenerator { def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(this.getClass) val spark = SparkSession .builder() .appName("Iceberg Data Generator") .master("local[*]") .config("spark.driver.memory", "16g") .config("spark.executor.memory", "16g") .config("spark.driver.maxResultSize", "2g") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.rest.type", "rest") .config("spark.sql.catalog.rest.uri", "http://127.0.0.1:9001/iceberg/") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.shuffle.partitions", "1000") .config("spark.default.parallelism", "32") .getOrCreate() import spark.implicits._ spark.sql("CREATE DATABASE IF NOT EXISTS rest.db;") spark.sql( """ |CREATE TABLE IF NOT EXISTS rest.db.customers2 ( | customer_id INT, | customer_name STRING, | date DATE, | transaction_details STRING |) USING iceberg |PARTITIONED BY (bucket(1000, customer_id), days(date)) """.stripMargin) // generate the data def generateCustomerData(numbers: Seq[Int]): Seq[(Int, String, Date, String)] = { val random = new Random() numbers.map { i => val customerId = i val customerName = s"Customer_$i" val date = Date.valueOf(LocalDate.now().minusDays(random.nextInt(180))) // Random date within the last 6 months val transactionDetails = s"Transaction details for customer $i" (customerId, customerName, date, transactionDetails) } } // Generate 1,00,000 users val customerData = generateCustomerData(1 to 100000) // Convert to DataFrame val customerDF = customerData.toDF("customer_id", "customer_name", "date", "transaction_details") // Write the data to an Apache Iceberg table logger.info(s"partition count: ${customerDF.rdd.getNumPartitions}") customerDF .write .format("iceberg") .mode("append") .save("rest.db.customers2") val df = spark.sql("SELECT * FROM rest.db.customers2;") logger.info("Count: " + df.count()) // Stop Spark session spark.stop() } }Here is ERROR:[error] Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 1 times, most recent failure: Lost task 4.0 in stage 4.0 (TID 506) (192.168.29.234 executor driver): java.lang.OutOfMemoryError: Java heap space [error] at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79) [error] at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:158) [error] at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219) [error] at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202) [error] at org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90) [error] at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360) [error] at org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:760) [error] at org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131) [error] at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52) [error] at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32) [error] at org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:108) [error] at org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47) [error] at org.apache.iceberg.io.FanoutDataWriter.newWriter(FanoutDataWriter.java:53) [error] at org.apache.iceberg.io.FanoutWriter.writer(FanoutWriter.java:63) [error] at org.apache.iceberg.io.FanoutWriter.write(FanoutWriter.java:51) [error] at org.apache.iceberg.io.FanoutDataWriter.write(FanoutDataWriter.java:31) [error] at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:781) [error] at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:751) [error] at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:498) [error] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:453) [error] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$$Lambda$3795/0x00000008017a7440.apply(Unknown Source) [error] at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397) [error] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:491) [error] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430) [error] at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496) [error] at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393) [error] at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$3504/0x00000008016f2040.apply(Unknown Source) [error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) [error] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) [error] at org.apache.spark.scheduler.Task.run(Task.scala:141) [error] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) [error] at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2232/0x0000000800fa6040.apply(Unknown Source) [error] Driver stacktrace: [error] at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) [error] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) [error] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) [error] at scala.collection.immutable.List.foreach(List.scala:334) [error] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) [error] at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) [error] at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) [error] at scala.Option.foreach(Option.scala:437) [error] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) [error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) [error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) [error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) [error] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [error] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) [error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393) [error] at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:390) [error] at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:364) [error] at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:230) [error] at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:342) [error] at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:341) [error] at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:230) [error] at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) [error] at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) [error] at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) [error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) [error] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) [error] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) [error] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) [error] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) [error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) [error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) [error] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) [error] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) [error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) [error] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) [error] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) [error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) [error] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) [error] at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) [error] at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) [error] at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) [error] at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) [error] at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869) [error] at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:315) [error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243) [error] at com.techmonad.spark.IcebergDataGenerator$.main(IcebergDataGenerator.scala:77) [error] at com.techmonad.spark.IcebergDataGenerator.main(IcebergDataGenerator.scala)
Questions:
1. Can bucketing be effectively used with Iceberg tables in Spark?
2. What could be causing the OOM issue, and are there potential workarounds?
Let me know if you'd like any additional details added!