Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47019

AQE dynamic cache partitioning causes SortMergeJoin to result in data loss




      It seems like we have encountered an issue with Spark AQE's dynamic cache partitioning which causes incorrect count output values and data loss.

      A similar issue could not be found, so i am creating this ticket to raise awareness.



      • Setup a cluster as per environment specification
      • Prepare test data (or a data large enough to trigger read by both executors)

      Steps to reproduce:

      • Read parent
      • Self join parent
      • cache + materialize parent
      • Join parent with child

      Performing a self-join over a parentDF, then caching + materialising the DF, and then joining it with a childDF results in incorrect count value and missing data.
      Performing a repartition seems to fix the issue, most probably due to rearrangement of the underlying partitions and statistic update.
      This behaviour is observed over a multi-worker cluster with a job running 2 executors (1 per worker), when reading a large enough data file by both executors.
      Not reproducible in local mode.
      So far, by disabling spark.sql.optimizer.canChangeCachedPlanOutputPartitioning or performing repartition this can be alleviated, but it is not the fix of the root cause.
      This issue is dangerous considering that data loss is occurring silently and in absence of proper checks can lead to wrong behaviour/results down the line. So we have labeled it as a blocker.
      There seems to be a file-size treshold after which dataloss is observed (possibly implying that it happens when both executors start reading the data file)
      Minimal example:

      // Read parent
      val parentData = session.read.format("avro").load("/data/shared/test/parent")
      // Self join parent and cache + materialize
      val parent = parentData.join(parentData, Seq("PID")).cache()
      // Read child
      val child = session.read.format("avro").load("/data/shared/test/child")
      // Basic join
      val resultBasic = child.join(
        parent("PID") === child("PARENT_ID")
      // Count: 16479 (Wrong)
      println(s"Count no repartition: ${resultBasic.count()}")
      // Repartition parent join
      val resultRepartition = child.join(
        parent("PID") === child("PARENT_ID")
      // Count: 50094 (Correct)
      println(s"Count with repartition: ${resultRepartition.count()}") 

      Invalid count-only DAG:

      Valid repartition DAG:



      Spark submit for this job:

            --class ExampleApp 
            --packages org.apache.spark:spark-avro_2.12:3.5.0 
            --deploy-mode cluster 
            --master spark://spark-master:6066 
            --conf spark.sql.autoBroadcastJoinThreshold=-1  
            --conf spark.cores.max=3 
            --driver-cores 1 
            --driver-memory 1g 
            --executor-cores 1 
            --executor-memory 1g 

      The cluster should be setup to the following (worker1(m+e) worker2(e)) as to split the executors onto two workers.

      I have prepared a simple github repository which contains the compilable above example.


        1. testdata.zip
          839 kB
          Ridvan Appa Bugis
        2. Screenshot 2024-02-07 at 20.10.07.png
          174 kB
          Ridvan Appa Bugis
        3. Screenshot 2024-02-07 at 20.09.44.png
          141 kB
          Ridvan Appa Bugis
        4. eventLogs-app-20240207175940-0023.zip
          93 kB
          Ridvan Appa Bugis



            Unassigned Unassigned
            ridvan Ridvan Appa Bugis
            0 Vote for this issue
            2 Start watching this issue