Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0
-
None
Description
Hi, we have a long-lived Spark application run on a standalone cluster on GCP and we are using spot instances. To reduce the impact of preempted instances, we have enabled node decommission to let the preempted node migrate its shuffle data to other instances before it is deleted by GCP.
However, we found the migrated shuffle data from the decommissioned node is never removed. (same behavior on spark-3.5)
Reproduce steps:
1. Start spark-shell with 3 executors and enable decommission on both driver/worker
start-worker.sh[3331]: Spark Command: /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 -Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master-01.com:7077
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ --total-executor-cores 12 \ --conf spark.decommission.enabled=true \ --conf spark.storage.decommission.enabled=true \ --conf spark.storage.decommission.shuffleBlocks.enabled=true \ --conf spark.storage.decommission.rddBlocks.enabled=true
2. Manually stop 1 worker during execution
(1 to 10).foreach { i => println(s"start iter $i ...") val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec sodales tempor, odio augue euismod ipsum, nec tristique e" val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", "mystr") df.repartition(6).count() System.gc() println(s"finished iter $i, wait 15s for next round") Thread.sleep(15*1000) } System.gc() start iter 1 ... finished iter 1, wait 15s for next round ...
3. Check the migrated shuffle data files on the remaining workers
decommissioned node: migrated shuffle file successfully
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None)
remaining shuffle data files on the other workers: the migrated shuffle files are never removed
10.67.5.134 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data 10.67.5.139 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
Expected behavior:
The migrated shuffle data files should be removed after job completed
Attachments
Issue Links
- relates to
-
SPARK-20624 SPIP: Add better handling for node shutdown
- In Progress