Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46957

Migrated shuffle data files from the decommissioned node should be removed when job completed

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.0
    • None
    • Spark Core
    • None

    Description

      Hi, we have a long-lived Spark application run on a standalone cluster on GCP and we are using spot instances. To reduce the impact of preempted instances, we have enabled node decommission to let the preempted node migrate its shuffle data to other instances before it is deleted by GCP.

      However, we found the migrated shuffle data from the decommissioned node is never removed. (same behavior on spark-3.5)

      Reproduce steps:

      1. Start spark-shell with 3 executors and enable decommission on both driver/worker

      start-worker.sh[3331]: Spark Command: /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 -Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master-01.com:7077 
      /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
        --total-executor-cores 12 \
        --conf spark.decommission.enabled=true \
        --conf spark.storage.decommission.enabled=true \
        --conf spark.storage.decommission.shuffleBlocks.enabled=true \
        --conf spark.storage.decommission.rddBlocks.enabled=true

       

      2. Manually stop 1 worker during execution

      (1 to 10).foreach { i =>
        println(s"start iter $i ...")
        val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec sodales tempor, odio augue euismod ipsum, nec tristique e"
        val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", "mystr")
      
        df.repartition(6).count()
        System.gc()
        println(s"finished iter $i, wait 15s for next round")
        Thread.sleep(15*1000)
      }
      System.gc()
      
      start iter 1 ...
      finished iter 1, wait 15s for next round
      ... 

       

      3. Check the migrated shuffle data files on the remaining workers

      decommissioned node: migrated shuffle file successfully

      less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
      24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
      24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
      24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
      24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
      24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
      24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) 

      remaining shuffle data files on the other workers: the migrated shuffle files are never removed

      10.67.5.134 | CHANGED | rc=0 >>
      -rw-r--r-- 1 spark spark 126 Feb  2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
      -rw-r--r-- 1 spark spark 126 Feb  2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
      -rw-r--r-- 1 spark spark 32 Feb  2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
      10.67.5.139 | CHANGED | rc=0 >>
      -rw-r--r-- 1 spark spark 126 Feb  2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
      -rw-r--r-- 1 spark spark 126 Feb  2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
      -rw-r--r-- 1 spark spark 32 Feb  2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data 

       

      Expected behavior:

      The migrated shuffle data files should be removed after job completed

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yujhe.li Yu-Jhe Li
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: